You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/06/21 21:41:56 UTC
kafka git commit: KAFKA-5491;
Enable transactions in ProducerPerformance Tool
Repository: kafka
Updated Branches:
refs/heads/trunk 96587f4b1 -> bc47e9d6c
KAFKA-5491; Enable transactions in ProducerPerformance Tool
With this patch, the `ProducePerfomance` tool can create transactions of differing durations.
This patch was used to to collect the initial set of benchmarks for transaction performance, documented here: https://docs.google.com/spreadsheets/d/1dHY6M7qCiX-NFvsgvaE0YoVdNq26uA8608XIh_DUpI4/edit#gid=282787170
Author: Apurva Mehta <ap...@confluent.io>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #3400 from apurvam/MINOR-add-transaction-size-to-producre-perf
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc47e9d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc47e9d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc47e9d6
Branch: refs/heads/trunk
Commit: bc47e9d6ca976ba3c15249500b2bb6f6355816bc
Parents: 96587f4
Author: Apurva Mehta <ap...@confluent.io>
Authored: Wed Jun 21 14:41:51 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jun 21 14:41:51 2017 -0700
----------------------------------------------------------------------
.../apache/kafka/tools/ProducerPerformance.java | 48 +++++++++++++++++++-
1 file changed, 47 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc47e9d6/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index da1f41b..0436d67 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -59,7 +59,10 @@ public class ProducerPerformance {
List<String> producerProps = res.getList("producerConfig");
String producerConfig = res.getString("producerConfigFile");
String payloadFilePath = res.getString("payloadFile");
+ String transactionalId = res.getString("transactionalId");
boolean shouldPrintMetrics = res.getBoolean("printMetrics");
+ long transactionDurationMs = res.getLong("transactionDurationMs");
+ boolean transactionsEnabled = 0 < transactionDurationMs;
// since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here.
String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n" : res.getString("payloadDelimiter");
@@ -99,7 +102,13 @@ public class ProducerPerformance {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
+ if (transactionsEnabled)
+ props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+
+ KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
+
+ if (transactionsEnabled)
+ producer.initTransactions();
/* setup perf test */
byte[] payload = null;
@@ -114,7 +123,16 @@ public class ProducerPerformance {
long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
+
+ int currentTransactionSize = 0;
+ long transactionStartTime = 0;
for (int i = 0; i < numRecords; i++) {
+ if (transactionsEnabled && currentTransactionSize == 0) {
+ producer.beginTransaction();
+ transactionStartTime = System.currentTimeMillis();
+ }
+
+
if (payloadFilePath != null) {
payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
}
@@ -124,11 +142,20 @@ public class ProducerPerformance {
Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
producer.send(record, cb);
+ currentTransactionSize++;
+ if (transactionsEnabled && transactionDurationMs <= (sendStartMs - transactionStartTime)) {
+ producer.commitTransaction();
+ currentTransactionSize = 0;
+ }
+
if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle();
}
}
+ if (transactionsEnabled && currentTransactionSize != 0)
+ producer.commitTransaction();
+
if (!shouldPrintMetrics) {
producer.close();
@@ -246,6 +273,25 @@ public class ProducerPerformance {
.dest("printMetrics")
.help("print out metrics at the end of the test.");
+ parser.addArgument("--transactional-id")
+ .action(store())
+ .required(false)
+ .type(String.class)
+ .metavar("TRANSACTIONAL-ID")
+ .dest("transactionalId")
+ .setDefault("performance-producer-default-transactional-id")
+ .help("The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions.");
+
+ parser.addArgument("--transaction-duration-ms")
+ .action(store())
+ .required(false)
+ .type(Long.class)
+ .metavar("TRANSACTION-DURATION")
+ .dest("transactionDurationMs")
+ .setDefault(0L)
+ .help("The max age of each transaction. The commitTransaction will be called after this this time has elapsed. Transactions are only enabled if this value is positive.");
+
+
return parser;
}