You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2018/11/27 20:50:17 UTC
[kafka] branch trunk updated: KAFKA-7597: Add transaction support
to ProduceBenchWorker (#5885)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9368743 KAFKA-7597: Add transaction support to ProduceBenchWorker (#5885)
9368743 is described below
commit 9368743b8fd2b42a41b44860ea0f3588bb273cc8
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Tue Nov 27 20:49:53 2018 +0000
KAFKA-7597: Add transaction support to ProduceBenchWorker (#5885)
KAFKA-7597: Add configurable transaction support to ProduceBenchWorker. In order to get support for serializing Optional<> types to JSON, add a new library: jackson-datatype-jdk8. Once Jackson 3 comes out, this library will not be needed.
Reviewers: Colin McCabe <cm...@apache.org>, Ismael Juma <is...@juma.me.uk>
---
build.gradle | 5 +
gradle/dependencies.gradle | 1 +
.../bin/trogdor-run-transactional-produce-bench.sh | 51 ++++++++++
.../services/trogdor/produce_bench_workload.py | 4 +-
tests/kafkatest/tests/core/produce_bench_test.py | 29 +++++-
.../org/apache/kafka/trogdor/common/JsonUtil.java | 2 +
.../kafka/trogdor/workload/ProduceBenchSpec.java | 37 +++++++
.../kafka/trogdor/workload/ProduceBenchWorker.java | 112 +++++++++++++++++----
.../trogdor/workload/TransactionGenerator.java | 43 ++++++++
.../workload/UniformTransactionsGenerator.java | 57 +++++++++++
.../trogdor/common/JsonSerializationTest.java | 3 +-
11 files changed, 318 insertions(+), 26 deletions(-)
diff --git a/build.gradle b/build.gradle
index 4d514df..5ce648a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -565,6 +565,7 @@ project(':core') {
dependencies {
compile project(':clients')
compile libs.jacksonDatabind
+ compile libs.jacksonJDK8Datatypes
compile libs.joptSimple
compile libs.metrics
compile libs.scalaLibrary
@@ -830,6 +831,7 @@ project(':clients') {
compile libs.snappy
compile libs.slf4jApi
compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing
+ compileOnly libs.jacksonJDK8Datatypes
jacksonDatabindConfig libs.jacksonDatabind // to publish as provided scope dependency.
@@ -839,6 +841,7 @@ project(':clients') {
testRuntime libs.slf4jlog4j
testRuntime libs.jacksonDatabind
+ testRuntime libs.jacksonJDK8Datatypes
}
task determineCommitId {
@@ -918,6 +921,7 @@ project(':tools') {
compile project(':log4j-appender')
compile libs.argparse4j
compile libs.jacksonDatabind
+ compile libs.jacksonJDK8Datatypes
compile libs.slf4jApi
compile libs.jacksonJaxrsJsonProvider
@@ -1347,6 +1351,7 @@ project(':connect:json') {
dependencies {
compile project(':connect:api')
compile libs.jacksonDatabind
+ compile libs.jacksonJDK8Datatypes
compile libs.slf4jApi
testCompile libs.easymock
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 7dd3604..59f56fc 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -103,6 +103,7 @@ libs += [
bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
easymock: "org.easymock:easymock:$versions.easymock",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
+ jacksonJDK8Datatypes: "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson",
jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",
jaxbApi: "javax.xml.bind:jaxb-api:$versions.jaxb",
jaxrsApi: "javax.ws.rs:javax.ws.rs-api:$versions.jaxrs",
diff --git a/tests/bin/trogdor-run-transactional-produce-bench.sh b/tests/bin/trogdor-run-transactional-produce-bench.sh
new file mode 100755
index 0000000..fd5ff0a
--- /dev/null
+++ b/tests/bin/trogdor-run-transactional-produce-bench.sh
@@ -0,0 +1,51 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+COORDINATOR_ENDPOINT="localhost:8889"
+TASK_ID="produce_bench_$RANDOM"
+TASK_SPEC=$(
+cat <<EOF
+{
+ "id": "$TASK_ID",
+ "spec": {
+ "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
+ "durationMs": 10000000,
+ "producerNode": "node0",
+ "bootstrapServers": "localhost:9092",
+ "targetMessagesPerSec": 100,
+ "maxMessages": 500,
+ "transactionGenerator" : {
+ "type" : "uniform",
+ "messagesPerTransaction" : 50
+ },
+ "activeTopics": {
+ "foo[1-3]": {
+ "numPartitions": 3,
+ "replicationFactor": 1
+ }
+ },
+ "inactiveTopics": {
+ "foo[4-5]": {
+ "numPartitions": 3,
+ "replicationFactor": 1
+ }
+ }
+ }
+}
+EOF
+)
+./bin/trogdor.sh client --create-task "${TASK_SPEC}" "${COORDINATOR_ENDPOINT}"
+echo "\$TASK_ID = $TASK_ID"
diff --git a/tests/kafkatest/services/trogdor/produce_bench_workload.py b/tests/kafkatest/services/trogdor/produce_bench_workload.py
index cf6a962..9afc814 100644
--- a/tests/kafkatest/services/trogdor/produce_bench_workload.py
+++ b/tests/kafkatest/services/trogdor/produce_bench_workload.py
@@ -21,7 +21,8 @@ from kafkatest.services.trogdor.task_spec import TaskSpec
class ProduceBenchWorkloadSpec(TaskSpec):
def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers,
target_messages_per_sec, max_messages, producer_conf, admin_client_conf,
- common_client_conf, inactive_topics, active_topics):
+ common_client_conf, inactive_topics, active_topics,
+ transaction_generator=None):
super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
self.message["class"] = "org.apache.kafka.trogdor.workload.ProduceBenchSpec"
self.message["producerNode"] = producer_node
@@ -29,6 +30,7 @@ class ProduceBenchWorkloadSpec(TaskSpec):
self.message["targetMessagesPerSec"] = target_messages_per_sec
self.message["maxMessages"] = max_messages
self.message["producerConf"] = producer_conf
+ self.message["transactionGenerator"] = transaction_generator
self.message["adminClientConf"] = admin_client_conf
self.message["commonClientConf"] = common_client_conf
self.message["inactiveTopics"] = inactive_topics
diff --git a/tests/kafkatest/tests/core/produce_bench_test.py b/tests/kafkatest/tests/core/produce_bench_test.py
index 125ee94..a316520 100644
--- a/tests/kafkatest/tests/core/produce_bench_test.py
+++ b/tests/kafkatest/tests/core/produce_bench_test.py
@@ -31,6 +31,8 @@ class ProduceBenchTest(Test):
self.workload_service = ProduceBenchWorkloadService(test_context, self.kafka)
self.trogdor = TrogdorService(context=self.test_context,
client_services=[self.kafka, self.workload_service])
+ self.active_topics = {"produce_bench_topic[0-1]": {"numPartitions": 1, "replicationFactor": 3}}
+ self.inactive_topics = {"produce_bench_topic[2-9]": {"numPartitions": 1, "replicationFactor": 3}}
def setUp(self):
self.trogdor.start()
@@ -43,8 +45,6 @@ class ProduceBenchTest(Test):
self.zk.stop()
def test_produce_bench(self):
- active_topics={"produce_bench_topic[0-1]":{"numPartitions":1, "replicationFactor":3}}
- inactive_topics={"produce_bench_topic[2-9]":{"numPartitions":1, "replicationFactor":3}}
spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.workload_service.producer_node,
self.workload_service.bootstrap_servers,
@@ -53,8 +53,29 @@ class ProduceBenchTest(Test):
producer_conf={},
admin_client_conf={},
common_client_conf={},
- inactive_topics=inactive_topics,
- active_topics=active_topics)
+ inactive_topics=self.inactive_topics,
+ active_topics=self.active_topics)
+ workload1 = self.trogdor.create_task("workload1", spec)
+ workload1.wait_for_done(timeout_sec=360)
+ tasks = self.trogdor.tasks()
+ self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
+
+ def test_produce_bench_transactions(self):
+ spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+ self.workload_service.producer_node,
+ self.workload_service.bootstrap_servers,
+ target_messages_per_sec=1000,
+ max_messages=100000,
+ producer_conf={},
+ admin_client_conf={},
+ common_client_conf={},
+ inactive_topics=self.inactive_topics,
+ active_topics=self.active_topics,
+ transaction_generator={
+ # 10 transactions with 10k messages
+ "type": "uniform",
+ "messagesPerTransaction": "10000"
+ })
workload1 = self.trogdor.create_task("workload1", spec)
workload1.wait_for_done(timeout_sec=360)
tasks = self.trogdor.tasks()
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java b/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
index 70193c3..ad90ffc 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
/**
* Utilities for working with JSON.
@@ -33,6 +34,7 @@ public class JsonUtil {
JSON_SERDE = new ObjectMapper();
JSON_SERDE.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
JSON_SERDE.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
+ JSON_SERDE.registerModule(new Jdk8Module());
JSON_SERDE.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index 30878bf..c0bbd7e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -26,10 +26,39 @@ import org.apache.kafka.trogdor.task.TaskWorker;
import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
/**
* The specification for a benchmark that produces messages to a set of topics.
+ *
+ * To configure a transactional producer, a #{@link TransactionGenerator} must be passed in.
+ * Said generator works in lockstep with the producer by instructing it what action to take next in regards to a transaction.
+ *
+ * An example JSON representation which will result in a producer that creates three topics (foo1, foo2, foo3)
+ * with three partitions each and produces to them:
+ * #{@code
+ * {
+ * "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
+ * "durationMs": 10000000,
+ * "producerNode": "node0",
+ * "bootstrapServers": "localhost:9092",
+ * "targetMessagesPerSec": 10,
+ * "maxMessages": 100,
+ * "activeTopics": {
+ * "foo[1-3]": {
+ * "numPartitions": 3,
+ * "replicationFactor": 1
+ * }
+ * },
+ * "inactiveTopics": {
+ * "foo[4-5]": {
+ * "numPartitions": 3,
+ * "replicationFactor": 1
+ * }
+ * }
+ * }
+ * }
*/
public class ProduceBenchSpec extends TaskSpec {
private final String producerNode;
@@ -38,6 +67,7 @@ public class ProduceBenchSpec extends TaskSpec {
private final int maxMessages;
private final PayloadGenerator keyGenerator;
private final PayloadGenerator valueGenerator;
+ private final Optional<TransactionGenerator> transactionGenerator;
private final Map<String, String> producerConf;
private final Map<String, String> adminClientConf;
private final Map<String, String> commonClientConf;
@@ -53,6 +83,7 @@ public class ProduceBenchSpec extends TaskSpec {
@JsonProperty("maxMessages") int maxMessages,
@JsonProperty("keyGenerator") PayloadGenerator keyGenerator,
@JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
+ @JsonProperty("transactionGenerator") Optional<TransactionGenerator> txGenerator,
@JsonProperty("producerConf") Map<String, String> producerConf,
@JsonProperty("commonClientConf") Map<String, String> commonClientConf,
@JsonProperty("adminClientConf") Map<String, String> adminClientConf,
@@ -67,6 +98,7 @@ public class ProduceBenchSpec extends TaskSpec {
new SequentialPayloadGenerator(4, 0) : keyGenerator;
this.valueGenerator = valueGenerator == null ?
new ConstantPayloadGenerator(512, new byte[0]) : valueGenerator;
+ this.transactionGenerator = txGenerator == null ? Optional.empty() : txGenerator;
this.producerConf = configOrEmptyMap(producerConf);
this.commonClientConf = configOrEmptyMap(commonClientConf);
this.adminClientConf = configOrEmptyMap(adminClientConf);
@@ -107,6 +139,11 @@ public class ProduceBenchSpec extends TaskSpec {
}
@JsonProperty
+ public Optional<TransactionGenerator> transactionGenerator() {
+ return transactionGenerator;
+ }
+
+ @JsonProperty
public Map<String, String> producerConf() {
return producerConf;
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index dc749eb..abf5976 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -36,6 +36,7 @@ import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+import org.apache.kafka.trogdor.workload.TransactionGenerator.TransactionAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,13 +44,16 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
public class ProduceBenchWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(ProduceBenchWorker.class);
@@ -179,18 +183,33 @@ public class ProduceBenchWorker implements TaskWorker {
private final PayloadIterator values;
+ private final Optional<TransactionGenerator> transactionGenerator;
+
private final Throttle throttle;
+ private Iterator<TopicPartition> partitionsIterator;
+ private Future<RecordMetadata> sendFuture;
+ private AtomicLong transactionsCommitted;
+ private boolean enableTransactions;
+
SendRecords(HashSet<TopicPartition> activePartitions) {
this.activePartitions = activePartitions;
+ this.partitionsIterator = activePartitions.iterator();
this.histogram = new Histogram(5000);
+
+ this.transactionGenerator = spec.transactionGenerator();
+ this.enableTransactions = this.transactionGenerator.isPresent();
+ this.transactionsCommitted = new AtomicLong();
+
int perPeriod = WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
- new StatusUpdater(histogram), 30, 30, TimeUnit.SECONDS);
+ new StatusUpdater(histogram, transactionsCommitted), 30, 30, TimeUnit.SECONDS);
+
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
- // add common client configs to producer properties, and then user-specified producer
- // configs
+ if (enableTransactions)
+ props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "produce-bench-transaction-id-" + UUID.randomUUID());
+ // add common client configs to producer properties, and then user-specified producer configs
WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.producerConf());
this.producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer());
this.keys = new PayloadIterator(spec.keyGenerator());
@@ -202,23 +221,29 @@ public class ProduceBenchWorker implements TaskWorker {
public Void call() throws Exception {
long startTimeMs = Time.SYSTEM.milliseconds();
try {
- Future<RecordMetadata> future = null;
try {
- Iterator<TopicPartition> iter = activePartitions.iterator();
- for (int m = 0; m < spec.maxMessages(); m++) {
- if (!iter.hasNext()) {
- iter = activePartitions.iterator();
+ if (enableTransactions)
+ producer.initTransactions();
+
+ int sentMessages = 0;
+ while (sentMessages < spec.maxMessages()) {
+ if (enableTransactions) {
+ boolean tookAction = takeTransactionAction();
+ if (tookAction)
+ continue;
}
- TopicPartition partition = iter.next();
- ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
- partition.topic(), partition.partition(), keys.next(), values.next());
- future = producer.send(record,
- new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
- throttle.increment();
+ sendMessage();
+ sentMessages++;
}
+ if (enableTransactions)
+ takeTransactionAction(); // give the transactionGenerator a chance to commit if configured evenly
+ } catch (Exception e) {
+ if (enableTransactions)
+ producer.abortTransaction();
+ throw e;
} finally {
- if (future != null) {
- future.get();
+ if (sendFuture != null) {
+ sendFuture.get();
}
producer.close();
}
@@ -226,7 +251,7 @@ public class ProduceBenchWorker implements TaskWorker {
WorkerUtils.abort(log, "SendRecords", e, doneFuture);
} finally {
statusUpdaterFuture.cancel(false);
- StatusData statusData = new StatusUpdater(histogram).update();
+ StatusData statusData = new StatusUpdater(histogram, transactionsCommitted).update();
long curTimeMs = Time.SYSTEM.milliseconds();
log.info("Sent {} total record(s) in {} ms. status: {}",
histogram.summarize().numSamples(), curTimeMs - startTimeMs, statusData);
@@ -235,6 +260,42 @@ public class ProduceBenchWorker implements TaskWorker {
return null;
}
+ private boolean takeTransactionAction() {
+ boolean tookAction = true;
+ TransactionAction nextAction = transactionGenerator.get().nextAction();
+ switch (nextAction) {
+ case BEGIN_TRANSACTION:
+ log.debug("Beginning transaction.");
+ producer.beginTransaction();
+ break;
+ case COMMIT_TRANSACTION:
+ log.debug("Committing transaction.");
+ producer.commitTransaction();
+ transactionsCommitted.getAndIncrement();
+ break;
+ case ABORT_TRANSACTION:
+ log.debug("Aborting transaction.");
+ producer.abortTransaction();
+ break;
+ case NO_OP:
+ tookAction = false;
+ break;
+ }
+ return tookAction;
+ }
+
+ private void sendMessage() throws InterruptedException {
+ if (!partitionsIterator.hasNext())
+ partitionsIterator = activePartitions.iterator();
+
+ TopicPartition partition = partitionsIterator.next();
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+ partition.topic(), partition.partition(), keys.next(), values.next());
+ sendFuture = producer.send(record,
+ new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
+ throttle.increment();
+ }
+
void recordDuration(long durationMs) {
histogram.add(durationMs);
}
@@ -242,9 +303,11 @@ public class ProduceBenchWorker implements TaskWorker {
public class StatusUpdater implements Runnable {
private final Histogram histogram;
+ private final AtomicLong transactionsCommitted;
- StatusUpdater(Histogram histogram) {
+ StatusUpdater(Histogram histogram, AtomicLong transactionsCommitted) {
this.histogram = histogram;
+ this.transactionsCommitted = transactionsCommitted;
}
@Override
@@ -261,7 +324,8 @@ public class ProduceBenchWorker implements TaskWorker {
StatusData statusData = new StatusData(summary.numSamples(), summary.average(),
summary.percentiles().get(0).value(),
summary.percentiles().get(1).value(),
- summary.percentiles().get(2).value());
+ summary.percentiles().get(2).value(),
+ transactionsCommitted.get());
status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
return statusData;
}
@@ -273,6 +337,7 @@ public class ProduceBenchWorker implements TaskWorker {
private final int p50LatencyMs;
private final int p95LatencyMs;
private final int p99LatencyMs;
+ private final long transactionsCommitted;
/**
* The percentiles to use when calculating the histogram data.
@@ -285,12 +350,14 @@ public class ProduceBenchWorker implements TaskWorker {
@JsonProperty("averageLatencyMs") float averageLatencyMs,
@JsonProperty("p50LatencyMs") int p50latencyMs,
@JsonProperty("p95LatencyMs") int p95latencyMs,
- @JsonProperty("p99LatencyMs") int p99latencyMs) {
+ @JsonProperty("p99LatencyMs") int p99latencyMs,
+ @JsonProperty("transactionsCommitted") long transactionsCommitted) {
this.totalSent = totalSent;
this.averageLatencyMs = averageLatencyMs;
this.p50LatencyMs = p50latencyMs;
this.p95LatencyMs = p95latencyMs;
this.p99LatencyMs = p99latencyMs;
+ this.transactionsCommitted = transactionsCommitted;
}
@JsonProperty
@@ -299,6 +366,11 @@ public class ProduceBenchWorker implements TaskWorker {
}
@JsonProperty
+ public long transactionsCommitted() {
+ return transactionsCommitted;
+ }
+
+ @JsonProperty
public float averageLatencyMs() {
return averageLatencyMs;
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java
new file mode 100644
index 0000000..5ec47ec
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+/**
+ * Generates actions that should be taken by a producer that uses transactions.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "type")
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(value = UniformTransactionsGenerator.class, name = "uniform"),
+})
+public interface TransactionGenerator {
+ enum TransactionAction {
+ BEGIN_TRANSACTION, COMMIT_TRANSACTION, ABORT_TRANSACTION, NO_OP
+ }
+
+ /**
+ * Returns the next action that the producer should take in regards to transactions.
+ * This method should be called every time before a producer sends a message.
+ * This means that most of the time it should return #{@link TransactionAction#NO_OP}
+ * to signal the producer that its next step should be to send a message.
+ */
+ TransactionAction nextAction();
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformTransactionsGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformTransactionsGenerator.java
new file mode 100644
index 0000000..1fbfbc2
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformTransactionsGenerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A uniform transactions generator where every N records are grouped in a separate transaction
+ */
+public class UniformTransactionsGenerator implements TransactionGenerator {
+
+ private final int messagesPerTransaction;
+ private int messagesInTransaction = -1;
+
+ @JsonCreator
+ public UniformTransactionsGenerator(@JsonProperty("messagesPerTransaction") int messagesPerTransaction) {
+ if (messagesPerTransaction < 1)
+ throw new IllegalArgumentException("Cannot have less than one message per transaction.");
+
+ this.messagesPerTransaction = messagesPerTransaction;
+ }
+
+ @JsonProperty
+ public int messagesPerTransaction() {
+ return messagesPerTransaction;
+ }
+
+ @Override
+ public synchronized TransactionAction nextAction() {
+ if (messagesInTransaction == -1) {
+ messagesInTransaction = 0;
+ return TransactionAction.BEGIN_TRANSACTION;
+ }
+ if (messagesInTransaction == messagesPerTransaction) {
+ messagesInTransaction = -1;
+ return TransactionAction.COMMIT_TRANSACTION;
+ }
+
+ messagesInTransaction += 1;
+ return TransactionAction.NO_OP;
+ }
+}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 5e6ff81..c324ec4 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -37,6 +37,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.junit.Assert.assertNotNull;
@@ -54,7 +55,7 @@ public class JsonSerializationTest {
verify(new WorkerRunning(null, null, 0, null));
verify(new WorkerStopping(null, null, 0, null));
verify(new ProduceBenchSpec(0, 0, null, null,
- 0, 0, null, null, null, null, null, null, null));
+ 0, 0, null, null, Optional.empty(), null, null, null, null, null));
verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null,
0, null, null, 0));
verify(new TopicsSpec());