You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/06/21 21:41:56 UTC

kafka git commit: KAFKA-5491; Enable transactions in ProducerPerformance Tool

Repository: kafka
Updated Branches:
  refs/heads/trunk 96587f4b1 -> bc47e9d6c


KAFKA-5491; Enable transactions in ProducerPerformance Tool

With this patch, the `ProducePerfomance` tool can create transactions of differing durations.

This patch was used to to collect the initial set of benchmarks for transaction performance, documented here: https://docs.google.com/spreadsheets/d/1dHY6M7qCiX-NFvsgvaE0YoVdNq26uA8608XIh_DUpI4/edit#gid=282787170

Author: Apurva Mehta <ap...@confluent.io>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #3400 from apurvam/MINOR-add-transaction-size-to-producre-perf


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc47e9d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc47e9d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc47e9d6

Branch: refs/heads/trunk
Commit: bc47e9d6ca976ba3c15249500b2bb6f6355816bc
Parents: 96587f4
Author: Apurva Mehta <ap...@confluent.io>
Authored: Wed Jun 21 14:41:51 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jun 21 14:41:51 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/tools/ProducerPerformance.java | 48 +++++++++++++++++++-
 1 file changed, 47 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bc47e9d6/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index da1f41b..0436d67 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -59,7 +59,10 @@ public class ProducerPerformance {
             List<String> producerProps = res.getList("producerConfig");
             String producerConfig = res.getString("producerConfigFile");
             String payloadFilePath = res.getString("payloadFile");
+            String transactionalId = res.getString("transactionalId");
             boolean shouldPrintMetrics = res.getBoolean("printMetrics");
+            long transactionDurationMs = res.getLong("transactionDurationMs");
+            boolean transactionsEnabled =  0 < transactionDurationMs;
 
             // since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here.
             String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n" : res.getString("payloadDelimiter");
@@ -99,7 +102,13 @@ public class ProducerPerformance {
 
             props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
             props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
+            if (transactionsEnabled)
+                props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+
+            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
+
+            if (transactionsEnabled)
+                producer.initTransactions();
 
             /* setup perf test */
             byte[] payload = null;
@@ -114,7 +123,16 @@ public class ProducerPerformance {
             long startMs = System.currentTimeMillis();
 
             ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
+
+            int currentTransactionSize = 0;
+            long transactionStartTime = 0;
             for (int i = 0; i < numRecords; i++) {
+                if (transactionsEnabled && currentTransactionSize == 0) {
+                    producer.beginTransaction();
+                    transactionStartTime = System.currentTimeMillis();
+                }
+
+
                 if (payloadFilePath != null) {
                     payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
                 }
@@ -124,11 +142,20 @@ public class ProducerPerformance {
                 Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
                 producer.send(record, cb);
 
+                currentTransactionSize++;
+                if (transactionsEnabled && transactionDurationMs <= (sendStartMs - transactionStartTime)) {
+                    producer.commitTransaction();
+                    currentTransactionSize = 0;
+                }
+
                 if (throttler.shouldThrottle(i, sendStartMs)) {
                     throttler.throttle();
                 }
             }
 
+            if (transactionsEnabled && currentTransactionSize != 0)
+                producer.commitTransaction();
+
             if (!shouldPrintMetrics) {
                 producer.close();
 
@@ -246,6 +273,25 @@ public class ProducerPerformance {
                 .dest("printMetrics")
                 .help("print out metrics at the end of the test.");
 
+        parser.addArgument("--transactional-id")
+               .action(store())
+               .required(false)
+               .type(String.class)
+               .metavar("TRANSACTIONAL-ID")
+               .dest("transactionalId")
+               .setDefault("performance-producer-default-transactional-id")
+               .help("The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions.");
+
+        parser.addArgument("--transaction-duration-ms")
+               .action(store())
+               .required(false)
+               .type(Long.class)
+               .metavar("TRANSACTION-DURATION")
+               .dest("transactionDurationMs")
+               .setDefault(0L)
+               .help("The max age of each transaction. The commitTransaction will be called after this this time has elapsed. Transactions are only enabled if this value is positive.");
+
+
         return parser;
     }