You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/10 02:54:03 UTC

[GitHub] [pulsar] congbobo184 commented on a change in pull request #11933: Add transaction perf

congbobo184 commented on a change in pull request #11933:
URL: https://github.com/apache/pulsar/pull/11933#discussion_r705497211



##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -333,8 +336,65 @@ public static void main(String[] args) throws Exception {
         if (isNotBlank(arguments.listenerName)) {
             clientBuilder.listenerName(arguments.listenerName);
         }
-
         PulsarClient pulsarClient = clientBuilder.build();
+        AtomicReference<Transaction> atomicReference = buildTransaction(pulsarClient, arguments);
+
+        AtomicLong messageTotal = new AtomicLong(0);
+        MessageListener<ByteBuffer> listener = (consumer, msg) -> {
+            try {
+                if (arguments.testTime > 0) {
+                    if (System.nanoTime() > testEndTime) {
+                        log.info("------------------- DONE -----------------------");
+                        printAggregatedStats();
+                        PerfClientUtils.exit(0);
+                    }
+                }
+                messagesReceived.increment();
+                bytesReceived.add(msg.size());
+
+                totalMessagesReceived.increment();
+                totalBytesReceived.add(msg.size());
+
+                if (limiter != null) {
+                    limiter.acquire();
+                }
+
+                long latencyMillis = System.currentTimeMillis() - msg.getPublishTime();
+                if (latencyMillis >= 0) {
+                    recorder.recordValue(latencyMillis);
+                    cumulativeRecorder.recordValue(latencyMillis);
+                }
+                if (arguments.isEnableTransaction) {
+                    consumer.acknowledgeAsync(msg.getMessageId(), atomicReference.get()).thenRun(() -> {
+                        totalMessageAck.increment();
+                        messageAck.increment();
+                    });
+                } else {
+                    consumer.acknowledgeAsync(msg);
+                }
+                if (arguments.poolMessages) {
+                    msg.release();
+                }
+                if (arguments.isEnableTransaction

Review comment:
       It doesn't seem to prevent ack with old transaction here. It seems that there have race condition.

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -189,12 +193,16 @@
         public long transactionTimeout = 5;
 
         @Parameter(names = {"-nmt", "---numMessage-perTransaction"},
-                description = "the number of a transaction produced")
+                description = "the number of messages of consumed by a transaction")

Review comment:
       `The number of messages per transaction acknowledgment. (Only `--txn-enable true` can it take effect)`is better

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -333,8 +336,65 @@ public static void main(String[] args) throws Exception {
         if (isNotBlank(arguments.listenerName)) {
             clientBuilder.listenerName(arguments.listenerName);
         }
-
         PulsarClient pulsarClient = clientBuilder.build();
+        AtomicReference<Transaction> atomicReference = buildTransaction(pulsarClient, arguments);
+
+        AtomicLong messageTotal = new AtomicLong(0);
+        MessageListener<ByteBuffer> listener = (consumer, msg) -> {
+            try {
+                if (arguments.testTime > 0) {
+                    if (System.nanoTime() > testEndTime) {
+                        log.info("------------------- DONE -----------------------");
+                        printAggregatedStats();
+                        PerfClientUtils.exit(0);
+                    }
+                }
+                messagesReceived.increment();
+                bytesReceived.add(msg.size());
+
+                totalMessagesReceived.increment();
+                totalBytesReceived.add(msg.size());
+
+                if (limiter != null) {
+                    limiter.acquire();
+                }
+
+                long latencyMillis = System.currentTimeMillis() - msg.getPublishTime();
+                if (latencyMillis >= 0) {
+                    recorder.recordValue(latencyMillis);
+                    cumulativeRecorder.recordValue(latencyMillis);
+                }
+                if (arguments.isEnableTransaction) {
+                    consumer.acknowledgeAsync(msg.getMessageId(), atomicReference.get()).thenRun(() -> {
+                        totalMessageAck.increment();

Review comment:
       I don't see where totalMessageAck is used.

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");

Review comment:
       consume topic should be `test-consume`

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");
+
+        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads")
+        public int numTestThreads = 1;
+
+
+        @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
+        public String separator = "-";
+
+        @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
+        public String adminURL;
+

Review comment:
       Don't add extra blank lines

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",

Review comment:
       This should be --topic-p, `All topics that needproduce for a transaction`

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
##########
@@ -241,15 +262,28 @@
         @Parameter(names = {"-bw", "--busy-wait"}, description = "Enable Busy-Wait on the Pulsar client")
         public boolean enableBusyWait = false;
 
-        @Parameter(names = { "-am", "--access-mode" }, description = "Producer access mode")
+        @Parameter(names = {"-am", "--access-mode"}, description = "Producer access mode")
         public ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared;
 
-        @Parameter(names = { "-fp", "--format-payload" },
-                description = "Format %i as a message index in the stream from producer and/or %t as the timestamp nanoseconds.")
+        @Parameter(names = {"-fp", "--format-payload"},
+                description = "Format %i as a message index in the stream from producer and/or %t as the timestamp "
+                        + "nanoseconds.")
         public boolean formatPayload = false;
 
-        @Parameter(names = {"-fc", "--format-class"}, description="Custom Formatter class name")
+        @Parameter(names = {"-fc", "--format-class"}, description = "Custom Formatter class name")
         public String formatterClass = "org.apache.pulsar.testclient.DefaultMessageFormatter";
+
+        @Parameter(names = {"-to", "--txn-timeout"}, description = "transaction timeout")
+        public long transactionTimeout = 5;
+
+        @Parameter(names = {"-nmt", "---numMessage-perTransaction"},
+                description = "the number of messages produced by  a transaction")
+        public int numMessagesPerTransaction = 50;
+
+        @Parameter(names = {"-txn", "--txn-enable"}, description = " whether transactions need to  be opened ")

Review comment:
       Enable or disable the transaction is better.

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,612 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");
+
+        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads")
+        public int numTestThreads = 1;
+
+
+
+        @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
+        public String adminURL;
+
+
+        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
+        public String serviceURL;
+
+
+        @Parameter(names = {"-np",
+                "--partitions"}, description = "Create partitioned topics with the given number of partitions, set 0 "
+                + "to not try to create the topic")
+        public Integer partitions = null;
+
+        @Parameter(names = {"-c",
+                "--max-connections"}, description = "Max number of TCP connections to a single broker")
+        public int maxConnections = 100;
+
+        @Parameter(names = {"-time",
+                "--test-duration"}, description = "Test duration in secs. If 0, it will keep publishing")
+        public long testTime = 0;
+
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
+
+        @Parameter(names = {"-ss",
+                "--subscriptions"}, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
+        public List<String> subscriptions = Collections.singletonList("sub");
+
+        @Parameter(names = {"-ns", "--num-subscriptions"}, description = "Number of subscriptions (per topic)")
+        public int numSubscriptions = 1;
+
+
+        @Parameter(names = {"-sp", "--subscription-position"}, description = "Subscription position")
+        private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
+
+        @Parameter(names = {"-st", "--subscription-type"}, description = "Subscription type")
+        public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+
+        @Parameter(names = {"-q", "--receiver-queue-size"}, description = "Size of the receiver queue")
+        public int receiverQueueSize = 1000;
+
+
+        @Parameter(names = {"-timeout", "--txn-timeout"}, description = "transaction timeout")
+        public long transactionTimeout = 5;
+
+        @Parameter(names = {"-nmp", "--numMessage-perTransaction-produce"},
+                description = "the number of messages produced by  a transaction")
+        public int numMessagesProducedPerTransaction = 1;
+        @Parameter(names = {"-nmc", "--numMessage-perTransaction-consume"},
+                description = "the number of messages consumed by  a transaction")
+        public int numMessagesReceivedPerTransaction = 1;
+
+        @Parameter(names = {"-ntnx",
+                "--number-txn"}, description = "the number of transaction, if o, it will keep opening")
+        public long numTransactions = 0;
+
+        @Parameter(names = {"-txn", "--txn-enable"}, description = " whether transactions need to  be opened ")
+        public boolean isEnableTransaction = false;
+
+        @Parameter(names = {"-end"}, description = " how to end a transaction, commit or abort")
+        public boolean isCommitedTransaction = true;
+
+        @Parameter(names = "-txnRate", description = "the rate of transaction  open, if 0 , don`t limit")
+        public int openTxnRate = 0;
+
+
+    }
+
+    public static void main(String[] args)
+            throws IOException, PulsarAdminException, ExecutionException, InterruptedException {
+        final Arguments arguments = new Arguments();
+        JCommander jc = new JCommander(arguments);
+        jc.setProgramName("pulsar-perf produce");
+
+        try {
+            jc.parse(args);
+        } catch (ParameterException e) {
+            System.out.println(e.getMessage());
+            jc.usage();
+            PerfClientUtils.exit(-1);
+        }
+
+        if (arguments.help) {
+            jc.usage();
+            PerfClientUtils.exit(-1);
+        }
+
+
+        if (arguments.confFile != null) {
+            Properties prop = new Properties(System.getProperties());
+            prop.load(new FileInputStream(arguments.confFile));
+
+            if (arguments.serviceURL == null) {
+                arguments.serviceURL = prop.getProperty("brokerServiceUrl");
+            }
+
+            if (arguments.serviceURL == null) {
+                arguments.serviceURL = prop.getProperty("webServiceUrl");
+            }
+
+            // fallback to previous-version serviceUrl property to maintain backward-compatibility
+            if (arguments.serviceURL == null) {
+                arguments.serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/");
+            }
+
+            if (arguments.adminURL == null) {
+                arguments.adminURL = prop.getProperty("webServiceUrl");
+            }
+            if (arguments.adminURL == null) {
+                arguments.adminURL = prop.getProperty("adminURL", "http://localhost:8080/");
+            }
+        }
+
+
+        // Dump config variables
+        PerfClientUtils.printJVMInformation(log);
+
+        ObjectMapper m = new ObjectMapper();
+        ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+        log.info("Starting Pulsar perf transaction with config: {}", w.writeValueAsString(arguments));
+
+        final byte[] payloadBytes = new byte[1024];
+        Random random = new Random(0);
+        for (int i = 0; i < payloadBytes.length; ++i) {
+            payloadBytes[i] = (byte) (random.nextInt(26) + 65);
+        }
+        if (arguments.partitions != null) {
+            PulsarAdminBuilder clientBuilder = PulsarAdmin.builder()
+                    .serviceHttpUrl(arguments.adminURL);
+
+
+            try (PulsarAdmin client = clientBuilder.build()) {
+                for (String topic : arguments.producerTopic) {
+                    log.info("Creating  produce partitioned topic {} with {} partitions", topic, arguments.partitions);
+                    try {
+                        client.topics().createPartitionedTopic(topic, arguments.partitions);
+                    } catch (PulsarAdminException.ConflictException alreadyExists) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Topic {} already exists: {}", topic, alreadyExists);
+                        }
+                        PartitionedTopicMetadata partitionedTopicMetadata =
+                                client.topics().getPartitionedTopicMetadata(topic);
+                        if (partitionedTopicMetadata.partitions != arguments.partitions) {
+                            log.error(
+                                    "Topic {} already exists but it has a wrong number of partitions: {}, expecting {}",
+                                    topic, partitionedTopicMetadata.partitions, arguments.partitions);
+                            PerfClientUtils.exit(-1);
+                        }
+                    }
+                }
+                for (String topic : arguments.consumerTopic) {
+                    log.info("Creating  consume partitioned topic {} with {} partitions", topic, arguments.partitions);
+                    try {
+                        client.topics().createPartitionedTopic(topic, arguments.partitions);
+                    } catch (PulsarAdminException.ConflictException alreadyExists) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Topic {} already exists: {}", topic, alreadyExists);
+                        }
+                        PartitionedTopicMetadata partitionedTopicMetadata =
+                                client.topics().getPartitionedTopicMetadata(topic);
+                        if (partitionedTopicMetadata.partitions != arguments.partitions) {
+                            log.error(
+                                    "Topic {} already exists but it has a wrong number of partitions: {}, expecting {}",
+                                    topic, partitionedTopicMetadata.partitions, arguments.partitions);
+                            PerfClientUtils.exit(-1);
+                        }
+                    }
+                }
+            }
+        }
+
+
+        PulsarClient client =
+                PulsarClient.builder().enableTransaction(arguments.isEnableTransaction).serviceUrl(arguments.serviceURL)
+                        .connectionsPerBroker(arguments.maxConnections)
+                        .ioThreads(arguments.ioThreads)
+                        .statsInterval(0, TimeUnit.SECONDS)
+                        .ioThreads(arguments.ioThreads).build();
+
+        ProducerBuilder<byte[]> producerBuilder = client.newProducer()//
+                .sendTimeout(0, TimeUnit.SECONDS);
+
+        final List<Future<Producer<byte[]>>> producerFutures = Lists.newArrayList();
+
+
+        ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer(Schema.BYTES) //
+                .subscriptionType(arguments.subscriptionType)
+                .receiverQueueSize(arguments.receiverQueueSize)
+                .subscriptionInitialPosition(arguments.subscriptionInitialPosition)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+
+
+        Iterator<String> consumerTopicIterator = arguments.consumerTopic.iterator();
+        Iterator<String> producerTopicIterator = arguments.producerTopic.iterator();
+
+        final List<List<Consumer<byte[]>>> consumers = Lists.newArrayList();
+        for (int i = 0; i < arguments.numTestThreads; i++) {
+            final List<Consumer<byte[]>> subscriptions = Lists.newArrayListWithCapacity(arguments.numSubscriptions);

Review comment:
       Maybe we shouldn't add two lists

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");
+
+        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads")
+        public int numTestThreads = 1;
+
+
+        @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
+        public String separator = "-";
+
+        @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
+        public String adminURL;
+
+
+        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
+        public String serviceURL;
+
+
+        @Parameter(names = {"-np",
+                "--partitions"}, description = "Create partitioned topics with the given number of partitions, set 0 "
+                + "to not try to create the topic")
+        public Integer partitions = null;
+
+        @Parameter(names = {"-c",
+                "--max-connections"}, description = "Max number of TCP connections to a single broker")
+        public int maxConnections = 100;
+
+        @Parameter(names = {"-time",
+                "--test-duration"}, description = "Test duration in secs. If 0, it will keep publishing")
+        public long testTime = 0;
+
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
+
+        @Parameter(names = {"-ss",
+                "--subscriptions"}, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
+        public List<String> subscriptions = Collections.singletonList("sub");
+
+        @Parameter(names = {"-ns", "--num-subscriptions"}, description = "Number of subscriptions (per topic)")
+        public int numSubscriptions = 1;
+
+
+        @Parameter(names = {"-sp", "--subscription-position"}, description = "Subscription position")
+        private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
+
+        @Parameter(names = {"-st", "--subscription-type"}, description = "Subscription type")
+        public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+
+        @Parameter(names = {"-q", "--receiver-queue-size"}, description = "Size of the receiver queue")
+        public int receiverQueueSize = 1000;
+
+
+        @Parameter(names = {"-timeout", "--txn-timeout"}, description = "transaction timeout")
+        public long transactionTimeout = 5;
+
+        @Parameter(names = {"-nmp", "--numMessage-perTransaction-produce"},
+                description = "the number of messages produced by  a transaction")
+        public int numMessagesProducedPerTransaction = 1;
+        @Parameter(names = {"-nmc", "--numMessage-perTransaction-consume"},
+                description = "the number of messages consumed by  a transaction")
+        public int numMessagesReceivedPerTransaction = 1;
+
+        @Parameter(names = {"-ntnx",
+                "--number-txn"}, description = "the number of transaction, if o, it will keep opening")
+        public long numTransactions = 0;
+
+        @Parameter(names = {"-txn", "--txn-enable"}, description = " whether transactions need to  be opened ")
+        public boolean isEnableTransaction = false;
+
+        @Parameter(names = {"-end"}, description = " how to end a transaction, commit or abort")

Review comment:
       Whether to commit or abort the transaction. (Only "--txn-enable true" can it take effect) is better

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -189,12 +193,16 @@
         public long transactionTimeout = 5;
 
         @Parameter(names = {"-nmt", "---numMessage-perTransaction"},
-                description = "the number of a transaction produced")
+                description = "the number of messages of consumed by a transaction")
         public int numMessagesPerTransaction = 50;
 
-        @Parameter(names = {"-txn", "--txn-enable"}, description = "transaction enable")
+        @Parameter(names = {"-txn", "--txn-enable"}, description = "whether transactions need to  be opened")
         public boolean isEnableTransaction = false;
 
+        @Parameter(names = {"-end"}, description = " how to end a transaction, commit or abort")

Review comment:
       `Whether to commit or abort the transaction. (Only "--txn-enable true" can it take effect)` is better

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -67,120 +68,149 @@
     private static final LongAdder totalMessagesReceived = new LongAdder();
     private static final LongAdder totalBytesReceived = new LongAdder();
 
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
     private static Recorder recorder = new Recorder(TimeUnit.DAYS.toMillis(10), 5);
     private static Recorder cumulativeRecorder = new Recorder(TimeUnit.DAYS.toMillis(10), 5);
 
     @Parameters(commandDescription = "Test pulsar consumer performance.")
     static class Arguments {
 
-        @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)

Review comment:
       Do not change the format, the following are all. You should probably double check when submitting

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -189,12 +193,16 @@
         public long transactionTimeout = 5;
 
         @Parameter(names = {"-nmt", "---numMessage-perTransaction"},
-                description = "the number of a transaction produced")
+                description = "the number of messages of consumed by a transaction")
         public int numMessagesPerTransaction = 50;
 
-        @Parameter(names = {"-txn", "--txn-enable"}, description = "transaction enable")
+        @Parameter(names = {"-txn", "--txn-enable"}, description = "whether transactions need to  be opened")

Review comment:
       `Enable or disable the transaction` is better.

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -284,38 +316,9 @@ public static void main(String[] args) throws Exception {
         final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
         long startTime = System.nanoTime();
         long testEndTime = startTime + (long) (arguments.testTime * 1e9);
-        MessageListener<ByteBuffer> listener = (consumer, msg) -> {

Review comment:
       I think this code should not be moved, which makes review difficult

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");

Review comment:
       produce topic should be test-produce

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -333,8 +336,65 @@ public static void main(String[] args) throws Exception {
         if (isNotBlank(arguments.listenerName)) {
             clientBuilder.listenerName(arguments.listenerName);
         }
-
         PulsarClient pulsarClient = clientBuilder.build();
+        AtomicReference<Transaction> atomicReference = buildTransaction(pulsarClient, arguments);
+
+        AtomicLong messageTotal = new AtomicLong(0);
+        MessageListener<ByteBuffer> listener = (consumer, msg) -> {
+            try {
+                if (arguments.testTime > 0) {
+                    if (System.nanoTime() > testEndTime) {
+                        log.info("------------------- DONE -----------------------");
+                        printAggregatedStats();
+                        PerfClientUtils.exit(0);
+                    }
+                }
+                messagesReceived.increment();
+                bytesReceived.add(msg.size());
+
+                totalMessagesReceived.increment();
+                totalBytesReceived.add(msg.size());
+
+                if (limiter != null) {
+                    limiter.acquire();
+                }
+
+                long latencyMillis = System.currentTimeMillis() - msg.getPublishTime();
+                if (latencyMillis >= 0) {
+                    recorder.recordValue(latencyMillis);
+                    cumulativeRecorder.recordValue(latencyMillis);
+                }
+                if (arguments.isEnableTransaction) {
+                    consumer.acknowledgeAsync(msg.getMessageId(), atomicReference.get()).thenRun(() -> {
+                        totalMessageAck.increment();
+                        messageAck.increment();
+                    });
+                } else {
+                    consumer.acknowledgeAsync(msg);
+                }
+                if (arguments.poolMessages) {
+                    msg.release();
+                }
+                if (arguments.isEnableTransaction
+                        && messageTotal.incrementAndGet() >= arguments.numMessagesPerTransaction) {
+                    Transaction transaction = atomicReference.get();
+                    if (atomicReference.compareAndSet(transaction, pulsarClient.newTransaction().
+                            withTransactionTimeout(arguments.transactionTimeout, TimeUnit.SECONDS).build().get())) {
+                        messageTotal.set(0);
+                        if (arguments.isCommitedTransaction) {
+                            transaction.commit();

Review comment:
       Maybe we should add the statistics of the number of commits and aborts

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =

Review comment:
       `All topics that need ack for a transaction` is better

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -333,8 +336,65 @@ public static void main(String[] args) throws Exception {
         if (isNotBlank(arguments.listenerName)) {
             clientBuilder.listenerName(arguments.listenerName);
         }
-
         PulsarClient pulsarClient = clientBuilder.build();
+        AtomicReference<Transaction> atomicReference = buildTransaction(pulsarClient, arguments);
+
+        AtomicLong messageTotal = new AtomicLong(0);
+        MessageListener<ByteBuffer> listener = (consumer, msg) -> {
+            try {
+                if (arguments.testTime > 0) {
+                    if (System.nanoTime() > testEndTime) {
+                        log.info("------------------- DONE -----------------------");
+                        printAggregatedStats();
+                        PerfClientUtils.exit(0);
+                    }
+                }
+                messagesReceived.increment();
+                bytesReceived.add(msg.size());
+
+                totalMessagesReceived.increment();
+                totalBytesReceived.add(msg.size());
+
+                if (limiter != null) {
+                    limiter.acquire();
+                }
+
+                long latencyMillis = System.currentTimeMillis() - msg.getPublishTime();
+                if (latencyMillis >= 0) {
+                    recorder.recordValue(latencyMillis);
+                    cumulativeRecorder.recordValue(latencyMillis);
+                }
+                if (arguments.isEnableTransaction) {
+                    consumer.acknowledgeAsync(msg.getMessageId(), atomicReference.get()).thenRun(() -> {

Review comment:
       Maybe we should add ack failure statistics

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");
+
+        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads")
+        public int numTestThreads = 1;
+
+
+        @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
+        public String separator = "-";
+
+        @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
+        public String adminURL;
+
+
+        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
+        public String serviceURL;
+
+
+        @Parameter(names = {"-np",
+                "--partitions"}, description = "Create partitioned topics with the given number of partitions, set 0 "
+                + "to not try to create the topic")
+        public Integer partitions = null;
+
+        @Parameter(names = {"-c",
+                "--max-connections"}, description = "Max number of TCP connections to a single broker")
+        public int maxConnections = 100;
+
+        @Parameter(names = {"-time",
+                "--test-duration"}, description = "Test duration in secs. If 0, it will keep publishing")
+        public long testTime = 0;
+
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
+
+        @Parameter(names = {"-ss",
+                "--subscriptions"}, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
+        public List<String> subscriptions = Collections.singletonList("sub");
+
+        @Parameter(names = {"-ns", "--num-subscriptions"}, description = "Number of subscriptions (per topic)")
+        public int numSubscriptions = 1;
+
+
+        @Parameter(names = {"-sp", "--subscription-position"}, description = "Subscription position")
+        private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
+
+        @Parameter(names = {"-st", "--subscription-type"}, description = "Subscription type")
+        public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+
+        @Parameter(names = {"-q", "--receiver-queue-size"}, description = "Size of the receiver queue")
+        public int receiverQueueSize = 1000;
+
+
+        @Parameter(names = {"-timeout", "--txn-timeout"}, description = "transaction timeout")
+        public long transactionTimeout = 5;
+
+        @Parameter(names = {"-nmp", "--numMessage-perTransaction-produce"},
+                description = "the number of messages produced by  a transaction")
+        public int numMessagesProducedPerTransaction = 1;
+        @Parameter(names = {"-nmc", "--numMessage-perTransaction-consume"},
+                description = "the number of messages consumed by  a transaction")
+        public int numMessagesReceivedPerTransaction = 1;

Review comment:
       numMessagesAckedPerTransaction

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");
+
+        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads")
+        public int numTestThreads = 1;
+
+
+        @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
+        public String separator = "-";
+
+        @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
+        public String adminURL;
+
+
+        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
+        public String serviceURL;
+
+
+        @Parameter(names = {"-np",
+                "--partitions"}, description = "Create partitioned topics with the given number of partitions, set 0 "
+                + "to not try to create the topic")
+        public Integer partitions = null;
+
+        @Parameter(names = {"-c",
+                "--max-connections"}, description = "Max number of TCP connections to a single broker")
+        public int maxConnections = 100;
+
+        @Parameter(names = {"-time",
+                "--test-duration"}, description = "Test duration in secs. If 0, it will keep publishing")
+        public long testTime = 0;
+
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
+
+        @Parameter(names = {"-ss",
+                "--subscriptions"}, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
+        public List<String> subscriptions = Collections.singletonList("sub");
+
+        @Parameter(names = {"-ns", "--num-subscriptions"}, description = "Number of subscriptions (per topic)")
+        public int numSubscriptions = 1;
+
+
+        @Parameter(names = {"-sp", "--subscription-position"}, description = "Subscription position")
+        private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
+
+        @Parameter(names = {"-st", "--subscription-type"}, description = "Subscription type")
+        public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+
+        @Parameter(names = {"-q", "--receiver-queue-size"}, description = "Size of the receiver queue")
+        public int receiverQueueSize = 1000;
+
+
+        @Parameter(names = {"-timeout", "--txn-timeout"}, description = "transaction timeout")
+        public long transactionTimeout = 5;
+
+        @Parameter(names = {"-nmp", "--numMessage-perTransaction-produce"},
+                description = "the number of messages produced by  a transaction")
+        public int numMessagesProducedPerTransaction = 1;
+        @Parameter(names = {"-nmc", "--numMessage-perTransaction-consume"},

Review comment:
       The number of messages per transaction acknowledgment. (Only --txn-enable true can it take effect)

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
##########
@@ -241,15 +262,28 @@
         @Parameter(names = {"-bw", "--busy-wait"}, description = "Enable Busy-Wait on the Pulsar client")
         public boolean enableBusyWait = false;
 
-        @Parameter(names = { "-am", "--access-mode" }, description = "Producer access mode")
+        @Parameter(names = {"-am", "--access-mode"}, description = "Producer access mode")
         public ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared;
 
-        @Parameter(names = { "-fp", "--format-payload" },
-                description = "Format %i as a message index in the stream from producer and/or %t as the timestamp nanoseconds.")
+        @Parameter(names = {"-fp", "--format-payload"},
+                description = "Format %i as a message index in the stream from producer and/or %t as the timestamp "
+                        + "nanoseconds.")
         public boolean formatPayload = false;
 
-        @Parameter(names = {"-fc", "--format-class"}, description="Custom Formatter class name")
+        @Parameter(names = {"-fc", "--format-class"}, description = "Custom Formatter class name")
         public String formatterClass = "org.apache.pulsar.testclient.DefaultMessageFormatter";
+
+        @Parameter(names = {"-to", "--txn-timeout"}, description = "transaction timeout")
+        public long transactionTimeout = 5;
+
+        @Parameter(names = {"-nmt", "---numMessage-perTransaction"},
+                description = "the number of messages produced by  a transaction")
+        public int numMessagesPerTransaction = 50;
+
+        @Parameter(names = {"-txn", "--txn-enable"}, description = " whether transactions need to  be opened ")
+        public boolean isEnableTransaction = false;
+        @Parameter(names = {"-end"}, description = " how to end a transaction, commit or abort")

Review comment:
       Whether to commit or abort the transaction. (Only "--txn-enable true" can it take effect) is better

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
##########
@@ -659,6 +723,27 @@ private static void runProducer(int producerId,
                             recorder.recordValue(latencyMicros);
                             cumulativeRecorder.recordValue(latencyMicros);
                         }
+                        if (arguments.isEnableTransaction

Review comment:
       Same problem as PerformanceConsumer

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");
+
+        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads")
+        public int numTestThreads = 1;
+
+
+        @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
+        public String separator = "-";
+
+        @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
+        public String adminURL;
+
+
+        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
+        public String serviceURL;
+
+
+        @Parameter(names = {"-np",
+                "--partitions"}, description = "Create partitioned topics with the given number of partitions, set 0 "
+                + "to not try to create the topic")
+        public Integer partitions = null;
+
+        @Parameter(names = {"-c",
+                "--max-connections"}, description = "Max number of TCP connections to a single broker")
+        public int maxConnections = 100;
+
+        @Parameter(names = {"-time",
+                "--test-duration"}, description = "Test duration in secs. If 0, it will keep publishing")
+        public long testTime = 0;
+
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
+
+        @Parameter(names = {"-ss",
+                "--subscriptions"}, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
+        public List<String> subscriptions = Collections.singletonList("sub");
+
+        @Parameter(names = {"-ns", "--num-subscriptions"}, description = "Number of subscriptions (per topic)")
+        public int numSubscriptions = 1;
+
+
+        @Parameter(names = {"-sp", "--subscription-position"}, description = "Subscription position")
+        private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
+
+        @Parameter(names = {"-st", "--subscription-type"}, description = "Subscription type")
+        public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+
+        @Parameter(names = {"-q", "--receiver-queue-size"}, description = "Size of the receiver queue")
+        public int receiverQueueSize = 1000;
+
+
+        @Parameter(names = {"-timeout", "--txn-timeout"}, description = "transaction timeout")
+        public long transactionTimeout = 5;
+
+        @Parameter(names = {"-nmp", "--numMessage-perTransaction-produce"},
+                description = "the number of messages produced by  a transaction")
+        public int numMessagesProducedPerTransaction = 1;
+        @Parameter(names = {"-nmc", "--numMessage-perTransaction-consume"},
+                description = "the number of messages consumed by  a transaction")
+        public int numMessagesReceivedPerTransaction = 1;
+
+        @Parameter(names = {"-ntnx",
+                "--number-txn"}, description = "the number of transaction, if o, it will keep opening")

Review comment:
       The number of transactions, if 0 it will keep opening

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");
+
+        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads")

Review comment:
       This thread is for new transaction and ack the `consumerTopic` message and produce message to `producerTopic ` then commit or abort this transaction. Increasing the number of threads will increase the parallelism of the performance test, thereby increasing the intensity of the stress test.

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");
+
+        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads")
+        public int numTestThreads = 1;
+
+
+        @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
+        public String separator = "-";
+
+        @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
+        public String adminURL;
+
+
+        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
+        public String serviceURL;
+
+
+        @Parameter(names = {"-np",
+                "--partitions"}, description = "Create partitioned topics with the given number of partitions, set 0 "
+                + "to not try to create the topic")
+        public Integer partitions = null;
+
+        @Parameter(names = {"-c",
+                "--max-connections"}, description = "Max number of TCP connections to a single broker")
+        public int maxConnections = 100;
+
+        @Parameter(names = {"-time",
+                "--test-duration"}, description = "Test duration in secs. If 0, it will keep publishing")
+        public long testTime = 0;
+
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
+
+        @Parameter(names = {"-ss",
+                "--subscriptions"}, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
+        public List<String> subscriptions = Collections.singletonList("sub");
+
+        @Parameter(names = {"-ns", "--num-subscriptions"}, description = "Number of subscriptions (per topic)")
+        public int numSubscriptions = 1;
+
+
+        @Parameter(names = {"-sp", "--subscription-position"}, description = "Subscription position")
+        private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
+
+        @Parameter(names = {"-st", "--subscription-type"}, description = "Subscription type")
+        public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+
+        @Parameter(names = {"-q", "--receiver-queue-size"}, description = "Size of the receiver queue")
+        public int receiverQueueSize = 1000;
+
+
+        @Parameter(names = {"-timeout", "--txn-timeout"}, description = "transaction timeout")
+        public long transactionTimeout = 5;
+
+        @Parameter(names = {"-nmp", "--numMessage-perTransaction-produce"},
+                description = "the number of messages produced by  a transaction")
+        public int numMessagesProducedPerTransaction = 1;
+        @Parameter(names = {"-nmc", "--numMessage-perTransaction-consume"},
+                description = "the number of messages consumed by  a transaction")
+        public int numMessagesReceivedPerTransaction = 1;
+
+        @Parameter(names = {"-ntnx",

Review comment:
       txn

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");
+
+        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads")
+        public int numTestThreads = 1;
+
+
+        @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
+        public String separator = "-";
+
+        @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
+        public String adminURL;
+
+
+        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
+        public String serviceURL;
+
+
+        @Parameter(names = {"-np",
+                "--partitions"}, description = "Create partitioned topics with the given number of partitions, set 0 "
+                + "to not try to create the topic")
+        public Integer partitions = null;
+
+        @Parameter(names = {"-c",
+                "--max-connections"}, description = "Max number of TCP connections to a single broker")
+        public int maxConnections = 100;
+
+        @Parameter(names = {"-time",
+                "--test-duration"}, description = "Test duration in secs. If 0, it will keep publishing")
+        public long testTime = 0;
+
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
+
+        @Parameter(names = {"-ss",
+                "--subscriptions"}, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
+        public List<String> subscriptions = Collections.singletonList("sub");
+
+        @Parameter(names = {"-ns", "--num-subscriptions"}, description = "Number of subscriptions (per topic)")
+        public int numSubscriptions = 1;
+
+
+        @Parameter(names = {"-sp", "--subscription-position"}, description = "Subscription position")
+        private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
+
+        @Parameter(names = {"-st", "--subscription-type"}, description = "Subscription type")
+        public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+
+        @Parameter(names = {"-q", "--receiver-queue-size"}, description = "Size of the receiver queue")
+        public int receiverQueueSize = 1000;
+
+
+        @Parameter(names = {"-timeout", "--txn-timeout"}, description = "transaction timeout")
+        public long transactionTimeout = 5;
+
+        @Parameter(names = {"-nmp", "--numMessage-perTransaction-produce"},
+                description = "the number of messages produced by  a transaction")
+        public int numMessagesProducedPerTransaction = 1;
+        @Parameter(names = {"-nmc", "--numMessage-perTransaction-consume"},
+                description = "the number of messages consumed by  a transaction")
+        public int numMessagesReceivedPerTransaction = 1;
+
+        @Parameter(names = {"-ntnx",
+                "--number-txn"}, description = "the number of transaction, if o, it will keep opening")
+        public long numTransactions = 0;
+
+        @Parameter(names = {"-txn", "--txn-enable"}, description = " whether transactions need to  be opened ")
+        public boolean isEnableTransaction = false;
+
+        @Parameter(names = {"-end"}, description = " how to end a transaction, commit or abort")
+        public boolean isCommitedTransaction = true;
+
+        @Parameter(names = "-txnRate", description = "the rate of transaction  open, if 0 , don`t limit")
+        public int openTxnRate = 0;
+
+
+    }
+
+    public static void main(String[] args)
+            throws IOException, PulsarAdminException, ExecutionException, InterruptedException {

Review comment:
       throw `Exception` is better

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");
+
+        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads")
+        public int numTestThreads = 1;
+
+
+        @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
+        public String separator = "-";
+
+        @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
+        public String adminURL;
+
+
+        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
+        public String serviceURL;
+
+
+        @Parameter(names = {"-np",
+                "--partitions"}, description = "Create partitioned topics with the given number of partitions, set 0 "
+                + "to not try to create the topic")
+        public Integer partitions = null;
+
+        @Parameter(names = {"-c",
+                "--max-connections"}, description = "Max number of TCP connections to a single broker")
+        public int maxConnections = 100;
+
+        @Parameter(names = {"-time",
+                "--test-duration"}, description = "Test duration in secs. If 0, it will keep publishing")
+        public long testTime = 0;
+
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
+
+        @Parameter(names = {"-ss",
+                "--subscriptions"}, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
+        public List<String> subscriptions = Collections.singletonList("sub");
+
+        @Parameter(names = {"-ns", "--num-subscriptions"}, description = "Number of subscriptions (per topic)")

Review comment:
       "Number of subscriptions (per consumerTopic)"

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");
+
+        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads")
+        public int numTestThreads = 1;
+
+
+        @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
+        public String separator = "-";
+
+        @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
+        public String adminURL;
+
+
+        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
+        public String serviceURL;
+
+
+        @Parameter(names = {"-np",
+                "--partitions"}, description = "Create partitioned topics with the given number of partitions, set 0 "
+                + "to not try to create the topic")
+        public Integer partitions = null;
+
+        @Parameter(names = {"-c",
+                "--max-connections"}, description = "Max number of TCP connections to a single broker")
+        public int maxConnections = 100;
+
+        @Parameter(names = {"-time",
+                "--test-duration"}, description = "Test duration in secs. If 0, it will keep publishing")
+        public long testTime = 0;
+
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
+
+        @Parameter(names = {"-ss",
+                "--subscriptions"}, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
+        public List<String> subscriptions = Collections.singletonList("sub");
+
+        @Parameter(names = {"-ns", "--num-subscriptions"}, description = "Number of subscriptions (per topic)")
+        public int numSubscriptions = 1;
+
+
+        @Parameter(names = {"-sp", "--subscription-position"}, description = "Subscription position")
+        private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
+
+        @Parameter(names = {"-st", "--subscription-type"}, description = "Subscription type")
+        public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+
+        @Parameter(names = {"-q", "--receiver-queue-size"}, description = "Size of the receiver queue")
+        public int receiverQueueSize = 1000;
+
+
+        @Parameter(names = {"-timeout", "--txn-timeout"}, description = "transaction timeout")
+        public long transactionTimeout = 5;
+
+        @Parameter(names = {"-nmp", "--numMessage-perTransaction-produce"},

Review comment:
       same as follows

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");
+
+        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads")
+        public int numTestThreads = 1;
+
+
+        @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
+        public String separator = "-";
+
+        @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
+        public String adminURL;
+
+
+        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
+        public String serviceURL;
+
+
+        @Parameter(names = {"-np",
+                "--partitions"}, description = "Create partitioned topics with the given number of partitions, set 0 "
+                + "to not try to create the topic")
+        public Integer partitions = null;
+
+        @Parameter(names = {"-c",
+                "--max-connections"}, description = "Max number of TCP connections to a single broker")
+        public int maxConnections = 100;
+
+        @Parameter(names = {"-time",
+                "--test-duration"}, description = "Test duration in secs. If 0, it will keep publishing")
+        public long testTime = 0;
+
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
+
+        @Parameter(names = {"-ss",
+                "--subscriptions"}, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
+        public List<String> subscriptions = Collections.singletonList("sub");
+
+        @Parameter(names = {"-ns", "--num-subscriptions"}, description = "Number of subscriptions (per topic)")
+        public int numSubscriptions = 1;
+
+
+        @Parameter(names = {"-sp", "--subscription-position"}, description = "Subscription position")
+        private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
+
+        @Parameter(names = {"-st", "--subscription-type"}, description = "Subscription type")
+        public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+
+        @Parameter(names = {"-q", "--receiver-queue-size"}, description = "Size of the receiver queue")
+        public int receiverQueueSize = 1000;
+
+
+        @Parameter(names = {"-timeout", "--txn-timeout"}, description = "transaction timeout")
+        public long transactionTimeout = 5;
+
+        @Parameter(names = {"-nmp", "--numMessage-perTransaction-produce"},
+                description = "the number of messages produced by  a transaction")
+        public int numMessagesProducedPerTransaction = 1;
+        @Parameter(names = {"-nmc", "--numMessage-perTransaction-consume"},
+                description = "the number of messages consumed by  a transaction")
+        public int numMessagesReceivedPerTransaction = 1;
+
+        @Parameter(names = {"-ntnx",
+                "--number-txn"}, description = "the number of transaction, if o, it will keep opening")
+        public long numTransactions = 0;
+
+        @Parameter(names = {"-txn", "--txn-enable"}, description = " whether transactions need to  be opened ")
+        public boolean isEnableTransaction = false;
+
+        @Parameter(names = {"-end"}, description = " how to end a transaction, commit or abort")
+        public boolean isCommitedTransaction = true;
+
+        @Parameter(names = "-txnRate", description = "the rate of transaction  open, if 0 , don`t limit")
+        public int openTxnRate = 0;
+
+
+    }
+
+    public static void main(String[] args)
+            throws IOException, PulsarAdminException, ExecutionException, InterruptedException {
+        final Arguments arguments = new Arguments();
+        JCommander jc = new JCommander(arguments);
+        jc.setProgramName("pulsar-perf produce");
+
+        try {
+            jc.parse(args);
+        } catch (ParameterException e) {
+            System.out.println(e.getMessage());
+            jc.usage();
+            PerfClientUtils.exit(-1);
+        }
+
+        if (arguments.help) {
+            jc.usage();
+            PerfClientUtils.exit(-1);
+        }
+
+
+        if (arguments.confFile != null) {
+            Properties prop = new Properties(System.getProperties());
+            prop.load(new FileInputStream(arguments.confFile));
+
+            if (arguments.serviceURL == null) {
+                arguments.serviceURL = prop.getProperty("brokerServiceUrl");
+            }
+
+            if (arguments.serviceURL == null) {
+                arguments.serviceURL = prop.getProperty("webServiceUrl");
+            }
+
+            // fallback to previous-version serviceUrl property to maintain backward-compatibility
+            if (arguments.serviceURL == null) {
+                arguments.serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/");
+            }
+
+            if (arguments.adminURL == null) {
+                arguments.adminURL = prop.getProperty("webServiceUrl");
+            }
+            if (arguments.adminURL == null) {
+                arguments.adminURL = prop.getProperty("adminURL", "http://localhost:8080/");
+            }
+        }
+
+
+        // Dump config variables
+        PerfClientUtils.printJVMInformation(log);
+
+        ObjectMapper m = new ObjectMapper();
+        ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+        log.info("Starting Pulsar perf transaction with config: {}", w.writeValueAsString(arguments));
+
+        final byte[] payloadBytes = new byte[1024];
+        Random random = new Random(0);
+        for (int i = 0; i < payloadBytes.length; ++i) {
+            payloadBytes[i] = (byte) (random.nextInt(26) + 65);
+        }
+        if (arguments.partitions != null) {
+            PulsarAdminBuilder clientBuilder = PulsarAdmin.builder()
+                    .serviceHttpUrl(arguments.adminURL);
+
+
+            try (PulsarAdmin client = clientBuilder.build()) {
+                for (String topic : arguments.producerTopic) {
+                    log.info("Creating  produce partitioned topic {} with {} partitions", topic, arguments.partitions);
+                    try {
+                        client.topics().createPartitionedTopic(topic, arguments.partitions);
+                    } catch (PulsarAdminException.ConflictException alreadyExists) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Topic {} already exists: {}", topic, alreadyExists);
+                        }
+                        PartitionedTopicMetadata partitionedTopicMetadata =
+                                client.topics().getPartitionedTopicMetadata(topic);
+                        if (partitionedTopicMetadata.partitions != arguments.partitions) {
+                            log.error(
+                                    "Topic {} already exists but it has a wrong number of partitions: {}, expecting {}",
+                                    topic, partitionedTopicMetadata.partitions, arguments.partitions);
+                            PerfClientUtils.exit(-1);
+                        }
+                    }
+                }
+                for (String topic : arguments.consumerTopic) {
+                    log.info("Creating  consume partitioned topic {} with {} partitions", topic, arguments.partitions);
+                    try {
+                        client.topics().createPartitionedTopic(topic, arguments.partitions);
+                    } catch (PulsarAdminException.ConflictException alreadyExists) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Topic {} already exists: {}", topic, alreadyExists);
+                        }
+                        PartitionedTopicMetadata partitionedTopicMetadata =
+                                client.topics().getPartitionedTopicMetadata(topic);
+                        if (partitionedTopicMetadata.partitions != arguments.partitions) {
+                            log.error(
+                                    "Topic {} already exists but it has a wrong number of partitions: {}, expecting {}",
+                                    topic, partitionedTopicMetadata.partitions, arguments.partitions);
+                            PerfClientUtils.exit(-1);
+                        }
+                    }
+                }
+            }
+        }
+
+
+        PulsarClient client =
+                PulsarClient.builder().enableTransaction(arguments.isEnableTransaction).serviceUrl(arguments.serviceURL)
+                        .connectionsPerBroker(arguments.maxConnections)
+                        .ioThreads(arguments.ioThreads)
+                        .statsInterval(0, TimeUnit.SECONDS)
+                        .ioThreads(arguments.ioThreads).build();
+
+        ProducerBuilder<byte[]> producerBuilder = client.newProducer()//
+                .sendTimeout(0, TimeUnit.SECONDS);

Review comment:
       enable transaction should .sendTimeout(0, TimeUnit.SECONDS);

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");
+
+        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads")
+        public int numTestThreads = 1;
+
+
+        @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
+        public String separator = "-";
+
+        @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
+        public String adminURL;
+
+
+        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
+        public String serviceURL;
+
+
+        @Parameter(names = {"-np",
+                "--partitions"}, description = "Create partitioned topics with the given number of partitions, set 0 "
+                + "to not try to create the topic")
+        public Integer partitions = null;
+
+        @Parameter(names = {"-c",
+                "--max-connections"}, description = "Max number of TCP connections to a single broker")
+        public int maxConnections = 100;
+
+        @Parameter(names = {"-time",
+                "--test-duration"}, description = "Test duration in secs. If 0, it will keep publishing")
+        public long testTime = 0;
+
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
+
+        @Parameter(names = {"-ss",
+                "--subscriptions"}, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
+        public List<String> subscriptions = Collections.singletonList("sub");
+
+        @Parameter(names = {"-ns", "--num-subscriptions"}, description = "Number of subscriptions (per topic)")
+        public int numSubscriptions = 1;
+
+
+        @Parameter(names = {"-sp", "--subscription-position"}, description = "Subscription position")
+        private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
+
+        @Parameter(names = {"-st", "--subscription-type"}, description = "Subscription type")
+        public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+
+        @Parameter(names = {"-q", "--receiver-queue-size"}, description = "Size of the receiver queue")
+        public int receiverQueueSize = 1000;
+
+
+        @Parameter(names = {"-timeout", "--txn-timeout"}, description = "transaction timeout")
+        public long transactionTimeout = 5;
+
+        @Parameter(names = {"-nmp", "--numMessage-perTransaction-produce"},
+                description = "the number of messages produced by  a transaction")
+        public int numMessagesProducedPerTransaction = 1;
+        @Parameter(names = {"-nmc", "--numMessage-perTransaction-consume"},
+                description = "the number of messages consumed by  a transaction")
+        public int numMessagesReceivedPerTransaction = 1;
+
+        @Parameter(names = {"-ntnx",
+                "--number-txn"}, description = "the number of transaction, if o, it will keep opening")
+        public long numTransactions = 0;
+
+        @Parameter(names = {"-txn", "--txn-enable"}, description = " whether transactions need to  be opened ")

Review comment:
       Enable or disable the transaction is better.

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
##########
@@ -0,0 +1,612 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.shaded.com.google.common.util.concurrent.RateLimiter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceTransaction {
+
+
+    private static final LongAdder totalNumTransaction = new LongAdder();
+    private static final LongAdder numTransaction = new LongAdder();
+
+    private static final LongAdder totalMessageAck = new LongAdder();
+    private static final LongAdder messageAck = new LongAdder();
+
+
+    private static Recorder messageAckRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageAckCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+    private static Recorder messageSendRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+    private static Recorder messageSendRCumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+
+
+    @Parameters(commandDescription = "Test pulsar transaction performance.")
+    static class Arguments {
+
+        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = {"--conf-file"}, description = "Configuration file")
+        public String confFile;
+
+        @Parameter(names = "--topic-c", description = "consumer will be created to consumer this topic", required =
+                true)
+        public List<String> consumerTopic = Collections.singletonList("sub");
+
+        @Parameter(names = "--topic-c", description = "producer will be created to produce message to this topic",
+                required = true)
+        public List<String> producerTopic = Collections.singletonList("test");
+
+        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads")
+        public int numTestThreads = 1;
+
+
+
+        @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
+        public String adminURL;
+
+
+        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
+        public String serviceURL;
+
+
+        @Parameter(names = {"-np",
+                "--partitions"}, description = "Create partitioned topics with the given number of partitions, set 0 "
+                + "to not try to create the topic")
+        public Integer partitions = null;
+
+        @Parameter(names = {"-c",
+                "--max-connections"}, description = "Max number of TCP connections to a single broker")
+        public int maxConnections = 100;
+
+        @Parameter(names = {"-time",
+                "--test-duration"}, description = "Test duration in secs. If 0, it will keep publishing")
+        public long testTime = 0;
+
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
+
+        @Parameter(names = {"-ss",
+                "--subscriptions"}, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
+        public List<String> subscriptions = Collections.singletonList("sub");
+
+        @Parameter(names = {"-ns", "--num-subscriptions"}, description = "Number of subscriptions (per topic)")
+        public int numSubscriptions = 1;
+
+
+        @Parameter(names = {"-sp", "--subscription-position"}, description = "Subscription position")
+        private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
+
+        @Parameter(names = {"-st", "--subscription-type"}, description = "Subscription type")
+        public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+
+        @Parameter(names = {"-q", "--receiver-queue-size"}, description = "Size of the receiver queue")
+        public int receiverQueueSize = 1000;
+
+
+        @Parameter(names = {"-timeout", "--txn-timeout"}, description = "transaction timeout")
+        public long transactionTimeout = 5;
+
+        @Parameter(names = {"-nmp", "--numMessage-perTransaction-produce"},
+                description = "the number of messages produced by  a transaction")
+        public int numMessagesProducedPerTransaction = 1;
+        @Parameter(names = {"-nmc", "--numMessage-perTransaction-consume"},
+                description = "the number of messages consumed by  a transaction")
+        public int numMessagesReceivedPerTransaction = 1;
+
+        @Parameter(names = {"-ntnx",
+                "--number-txn"}, description = "the number of transaction, if o, it will keep opening")
+        public long numTransactions = 0;
+
+        @Parameter(names = {"-txn", "--txn-enable"}, description = " whether transactions need to  be opened ")
+        public boolean isEnableTransaction = false;
+
+        @Parameter(names = {"-end"}, description = " how to end a transaction, commit or abort")
+        public boolean isCommitedTransaction = true;
+
+        @Parameter(names = "-txnRate", description = "the rate of transaction  open, if 0 , don`t limit")
+        public int openTxnRate = 0;
+
+
+    }
+
+    public static void main(String[] args)
+            throws IOException, PulsarAdminException, ExecutionException, InterruptedException {
+        final Arguments arguments = new Arguments();
+        JCommander jc = new JCommander(arguments);
+        jc.setProgramName("pulsar-perf produce");
+
+        try {
+            jc.parse(args);
+        } catch (ParameterException e) {
+            System.out.println(e.getMessage());
+            jc.usage();
+            PerfClientUtils.exit(-1);
+        }
+
+        if (arguments.help) {
+            jc.usage();
+            PerfClientUtils.exit(-1);
+        }
+
+
+        if (arguments.confFile != null) {
+            Properties prop = new Properties(System.getProperties());
+            prop.load(new FileInputStream(arguments.confFile));
+
+            if (arguments.serviceURL == null) {
+                arguments.serviceURL = prop.getProperty("brokerServiceUrl");
+            }
+
+            if (arguments.serviceURL == null) {
+                arguments.serviceURL = prop.getProperty("webServiceUrl");
+            }
+
+            // fallback to previous-version serviceUrl property to maintain backward-compatibility
+            if (arguments.serviceURL == null) {
+                arguments.serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/");
+            }
+
+            if (arguments.adminURL == null) {
+                arguments.adminURL = prop.getProperty("webServiceUrl");
+            }
+            if (arguments.adminURL == null) {
+                arguments.adminURL = prop.getProperty("adminURL", "http://localhost:8080/");
+            }
+        }
+
+
+        // Dump config variables
+        PerfClientUtils.printJVMInformation(log);
+
+        ObjectMapper m = new ObjectMapper();
+        ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+        log.info("Starting Pulsar perf transaction with config: {}", w.writeValueAsString(arguments));
+
+        final byte[] payloadBytes = new byte[1024];
+        Random random = new Random(0);
+        for (int i = 0; i < payloadBytes.length; ++i) {
+            payloadBytes[i] = (byte) (random.nextInt(26) + 65);
+        }
+        if (arguments.partitions != null) {
+            PulsarAdminBuilder clientBuilder = PulsarAdmin.builder()
+                    .serviceHttpUrl(arguments.adminURL);
+
+
+            try (PulsarAdmin client = clientBuilder.build()) {
+                for (String topic : arguments.producerTopic) {
+                    log.info("Creating  produce partitioned topic {} with {} partitions", topic, arguments.partitions);
+                    try {
+                        client.topics().createPartitionedTopic(topic, arguments.partitions);
+                    } catch (PulsarAdminException.ConflictException alreadyExists) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Topic {} already exists: {}", topic, alreadyExists);
+                        }
+                        PartitionedTopicMetadata partitionedTopicMetadata =
+                                client.topics().getPartitionedTopicMetadata(topic);
+                        if (partitionedTopicMetadata.partitions != arguments.partitions) {
+                            log.error(
+                                    "Topic {} already exists but it has a wrong number of partitions: {}, expecting {}",
+                                    topic, partitionedTopicMetadata.partitions, arguments.partitions);
+                            PerfClientUtils.exit(-1);
+                        }
+                    }
+                }
+                for (String topic : arguments.consumerTopic) {
+                    log.info("Creating  consume partitioned topic {} with {} partitions", topic, arguments.partitions);
+                    try {
+                        client.topics().createPartitionedTopic(topic, arguments.partitions);
+                    } catch (PulsarAdminException.ConflictException alreadyExists) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Topic {} already exists: {}", topic, alreadyExists);
+                        }
+                        PartitionedTopicMetadata partitionedTopicMetadata =
+                                client.topics().getPartitionedTopicMetadata(topic);
+                        if (partitionedTopicMetadata.partitions != arguments.partitions) {
+                            log.error(
+                                    "Topic {} already exists but it has a wrong number of partitions: {}, expecting {}",
+                                    topic, partitionedTopicMetadata.partitions, arguments.partitions);
+                            PerfClientUtils.exit(-1);
+                        }
+                    }
+                }
+            }
+        }
+
+
+        PulsarClient client =
+                PulsarClient.builder().enableTransaction(arguments.isEnableTransaction).serviceUrl(arguments.serviceURL)
+                        .connectionsPerBroker(arguments.maxConnections)
+                        .ioThreads(arguments.ioThreads)
+                        .statsInterval(0, TimeUnit.SECONDS)
+                        .ioThreads(arguments.ioThreads).build();
+
+        ProducerBuilder<byte[]> producerBuilder = client.newProducer()//
+                .sendTimeout(0, TimeUnit.SECONDS);
+
+        final List<Future<Producer<byte[]>>> producerFutures = Lists.newArrayList();
+
+
+        ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer(Schema.BYTES) //
+                .subscriptionType(arguments.subscriptionType)
+                .receiverQueueSize(arguments.receiverQueueSize)
+                .subscriptionInitialPosition(arguments.subscriptionInitialPosition)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+
+
+        Iterator<String> consumerTopicIterator = arguments.consumerTopic.iterator();
+        Iterator<String> producerTopicIterator = arguments.producerTopic.iterator();
+
+        final List<List<Consumer<byte[]>>> consumers = Lists.newArrayList();
+        for (int i = 0; i < arguments.numTestThreads; i++) {
+            final List<Consumer<byte[]>> subscriptions = Lists.newArrayListWithCapacity(arguments.numSubscriptions);
+            final List<Future<Consumer<byte[]>>> subscriptionFutures =
+                    Lists.newArrayListWithCapacity(arguments.numSubscriptions);
+            if (consumerTopicIterator.hasNext()) {

Review comment:
       In my understanding, each thread needs to create a corresponding number of consumers and producers. Maybe it should be a while loop.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org