You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/09 14:57:29 UTC

[GitHub] [kafka] vamossagar12 opened a new pull request, #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

vamossagar12 opened a new pull request, #13095:
URL: https://github.com/apache/kafka/pull/13095

   Move EndToEndLatency to tools


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1070621323


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            //Results
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, Integer.valueOf(random.nextInt(26) + 65).byteValue());
+        return randomBytes;
+    }
+
+    private static void createTopic(Properties props, String topic) {
+
+        short defaultReplicationFactor = 1;
+        int defaultNumPartitions = 1;
+
+        System.out.printf("Topic \"%s\" does not exist. " +
+                        "Will create topic with %d partition(s) and replication factor = %d%n",
+                topic, defaultNumPartitions, defaultReplicationFactor);
+
+        Admin adminClient = Admin.create(props);
+        NewTopic newTopic = new NewTopic(topic, defaultNumPartitions, defaultReplicationFactor);
+        adminClient.createTopics(Collections.singletonList(newTopic));
+        try {
+            adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+        } catch (ExecutionException | InterruptedException e) {
+            System.out.printf("Creation of topic %s failed", topic);
+            throw new RuntimeException(e);
+        } finally {
+            Utils.closeQuietly(adminClient, "AdminClient");
+        }
+    }
+
+    private static Properties loadPropsWithBootstrapServers(String propertiesFile) throws IOException {
+        return propertiesFile != null ? Utils.loadProps(propertiesFile) : new Properties();
+    }
+
+    private static KafkaConsumer<byte[], byte[]> createKafkaConsumer(Properties properties, String brokers) {
+        Properties consumerProps = new Properties(properties);
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

Review Comment:
   Can you move this to the shared `loadPropsWithBootstrapServers` as it was originally?



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {

Review Comment:
   I think it would be cleaner to move this if-block to the `execute` method and raise a `TerseException` with the usage message.



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {

Review Comment:
   It would be much clearer if you moved the exception handling inside `loadPropsWithBootstrapServers`. That way you could use this method directly inside `createKafkaConsumer`, `createKafkaProducer` and `createTopic`.



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));

Review Comment:
   Why you are not using the iterator as in the original code?



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;

Review Comment:
   This is not equivalent and there is some missing logic (filter).



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            //Results
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, Integer.valueOf(random.nextInt(26) + 65).byteValue());
+        return randomBytes;
+    }
+
+    private static void createTopic(Properties props, String topic) {
+

Review Comment:
   Can we remove this extra line?



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            //Results

Review Comment:
   Do we really need this comment after your refactoring?



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            //Results
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d", p50, p99, p999);

Review Comment:
   We miss a CR here.



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            //Results
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, Integer.valueOf(random.nextInt(26) + 65).byteValue());
+        return randomBytes;
+    }
+
+    private static void createTopic(Properties props, String topic) {
+
+        short defaultReplicationFactor = 1;
+        int defaultNumPartitions = 1;
+
+        System.out.printf("Topic \"%s\" does not exist. " +
+                        "Will create topic with %d partition(s) and replication factor = %d%n",
+                topic, defaultNumPartitions, defaultReplicationFactor);
+
+        Admin adminClient = Admin.create(props);
+        NewTopic newTopic = new NewTopic(topic, defaultNumPartitions, defaultReplicationFactor);
+        adminClient.createTopics(Collections.singletonList(newTopic));
+        try {
+            adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+        } catch (ExecutionException | InterruptedException e) {
+            System.out.printf("Creation of topic %s failed", topic);
+            throw new RuntimeException(e);
+        } finally {
+            Utils.closeQuietly(adminClient, "AdminClient");
+        }
+    }
+
+    private static Properties loadPropsWithBootstrapServers(String propertiesFile) throws IOException {
+        return propertiesFile != null ? Utils.loadProps(propertiesFile) : new Properties();
+    }
+
+    private static KafkaConsumer<byte[], byte[]> createKafkaConsumer(Properties properties, String brokers) {
+        Properties consumerProps = new Properties(properties);
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis());
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0"); //ensure we have no temporal batching
+        return new KafkaConsumer<>(consumerProps);
+    }
+
+    private static KafkaProducer<byte[], byte[]> createKafkaProducer(Properties properties, String acks, String brokers) {
+        Properties producerProps = new Properties(properties);
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

Review Comment:
   Can you move this to the shared `loadPropsWithBootstrapServers` as it was originally?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1069628843


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+    private final static long TIMEOUT = 60000;
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+
+        ArgumentParser parser = addArguments();
+
+        Namespace res = null;
+        try {
+            res = parser.parseArgs(args);
+        } catch (ArgumentParserException e) {
+            if (args.length == 0) {
+                parser.printHelp();
+                Exit.exit(0);
+            } else {
+                parser.handleError(e);
+                Exit.exit(1);
+            }
+        }
+
+        String brokers = res.getString("broker_list");
+        String topic = res.getString("topic");
+        int numMessages = res.getInt("num_messages");
+        String acks = res.getString("producer_acks");
+        int messageSizeBytes = res.getInt("message_size_bytes");
+        String propertiesFile = res.getString("properties_file");
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {

Review Comment:
   Here you have consumer properties passed to the producer instance with warnings printed out to the console. That's not what the original code does.



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+    private final static long TIMEOUT = 60000;

Review Comment:
   Why this is not at the start of the class? Can we rename it to `POLL_TIMEOUT_MS`?



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+    private final static long TIMEOUT = 60000;
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+
+        ArgumentParser parser = addArguments();
+
+        Namespace res = null;
+        try {
+            res = parser.parseArgs(args);
+        } catch (ArgumentParserException e) {
+            if (args.length == 0) {
+                parser.printHelp();
+                Exit.exit(0);
+            } else {
+                parser.handleError(e);
+                Exit.exit(1);
+            }
+        }
+
+        String brokers = res.getString("broker_list");
+        String topic = res.getString("topic");
+        int numMessages = res.getInt("num_messages");
+        String acks = res.getString("producer_acks");
+        int messageSizeBytes = res.getInt("message_size_bytes");
+        String propertiesFile = res.getString("properties_file");
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(TIMEOUT));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            //Results
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+
+    }
+
+    static ArgumentParser addArguments() {
+
+        ArgumentParser parser = ArgumentParsers
+                .newArgumentParser("end-to-end-latency")
+                .defaultHelp(true)
+                .description("This tool records the average end to end latency for a single message to travel through Kafka");
+
+
+        parser
+                .addArgument("-b", "--brokers")
+                .action(store())
+                .type(String.class)
+                .dest("broker_list")
+                .help("The location of the bootstrap broker for both the producer and the consumer");
+
+        parser
+                .addArgument("-t", "--topic")
+                .action(store())
+                .type(String.class)
+                .dest("topic")
+                .help("The Kakfa topic to send/receive messages to/from");
+
+        parser
+                .addArgument("-n", "--num-records")
+                .action(store())
+                .type(Integer.class)
+                .dest("num_messages")
+                .help("The number of messages to send");
+
+        parser
+                .addArgument("-a", "--acks")
+                .action(store())
+                .type(String.class)
+                .dest("producer_acks")
+                .help("The number of messages to send");
+
+        parser
+                .addArgument("-s", "--message-bytes")
+                .required(true)
+                .action(store())
+                .type(Integer.class)
+                .dest("message_size_bytes")
+                .help("Size of each message in bytes");
+
+        parser
+                .addArgument("-f", "--properties-file")
+                .action(store())
+                .type(String.class)
+                .dest("properties_file");
+
+        return parser;
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + TIMEOUT + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms\n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, new Integer(random.nextInt(26) + 65).byteValue());

Review Comment:
   Looks like this Integer constructor is deprecated, you should use `valueOf`.



##########
tests/kafkatest/services/performance/end_to_end_latency.py:
##########
@@ -87,7 +87,7 @@ def start_cmd(self, node):
         cmd = "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % EndToEndLatencyService.LOG4J_CONFIG
         if node.version.consumer_supports_bootstrap_server():
             cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s %(java_class_name)s " % args
-            cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(config_file)s" % args
+            cmd += "-b %(bootstrap_servers)s -t %(topic)s -n %(num_records)d -a %(acks)d -s %(message_bytes)d -f %(config_file)s" % args

Review Comment:
   This is completely changing the interface of this tool, not just the package name. Why we need to introduce `argparse4j` if it wasn't used in the original code? I would stay as close as possible to the origin.



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+    private final static long TIMEOUT = 60000;
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+

Review Comment:
   Can we get rid of these extra lines at the start/end of methods? There are quite a few of them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1381703690

   Looks like there are some more checkstyle failures. Will fix them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1088161412


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("USAGE: java " + EndToEndLatency.class.getName()
+                    + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+        }
+
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        Optional<String> propertiesFile = args.length > 5 ? (Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : Optional.empty();
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(propertiesFile, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(propertiesFile, brokers, acks)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(propertiesFile, brokers, topic);
+            }
+            setupConsumer(topic, consumer);
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d%n", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, Integer.valueOf(random.nextInt(26) + 65).byteValue());
+        return randomBytes;
+    }
+
+    private static void createTopic(Optional<String> propertiesFile, String brokers, String topic) throws IOException {
+        short defaultReplicationFactor = 1;
+        int defaultNumPartitions = 1;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1383104841

   @fvaleri , i removed the code changes related to argparse4j. That way the interface of the tool is exactly similar to what it was previously. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1072366317


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;

Review Comment:
   Thanks @ijuma , @fvaleri I made the changes. Let me know how's it looking now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison merged pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison merged PR #13095:
URL: https://github.com/apache/kafka/pull/13095


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1404639472

   hi @mimaison would you plz review this PR whenever you get the chance? It's already approved by @fvaleri . Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375847369

   Also, I'm currently focused on completing KAFKA-14470. @mimaison since you fleshed out KAFKA-14525, do you have cycles to do these reviews?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1408469183

   > I simply meant that it looks like some changes in the system tests are required too.
   
   Got it. Thanks for the confirmation. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1072496029


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("USAGE: java " + EndToEndLatency.class.getName()
+                    + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+        }
+
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        Optional<String> propertiesFile = args.length > 5 ? (Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : Optional.empty();
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(propertiesFile, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(propertiesFile, brokers, acks)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(propertiesFile, brokers, topic);
+            }
+            setupConsumer(topic, consumer);
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d%n", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, Integer.valueOf(random.nextInt(26) + 65).byteValue());
+        return randomBytes;
+    }
+
+    private static void createTopic(Optional<String> propertiesFile, String brokers, String topic) throws IOException {
+        short defaultReplicationFactor = 1;
+        int defaultNumPartitions = 1;
+
+        System.out.printf("Topic \"%s\" does not exist. " +
+                        "Will create topic with %d partition(s) and replication factor = %d%n",
+                topic, defaultNumPartitions, defaultReplicationFactor);
+
+        Properties adminProps = loadPropsWithBootstrapServers(propertiesFile, brokers);
+        Admin adminClient = Admin.create(adminProps);
+        NewTopic newTopic = new NewTopic(topic, defaultNumPartitions, defaultReplicationFactor);
+        adminClient.createTopics(Collections.singletonList(newTopic));

Review Comment:
   Sorry that was a miss. Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1071348242


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;

Review Comment:
   You should also use Optional, as in the original code. This would also require some other changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1426985065

   @fvaleri , @mimaison  here's a sample run from my local setup :
   
   ```
   (ducktape) sarao@C02GG1KCML7H kafka % TC_PATHS="tests/kafkatest/benchmarks/core/benchmark_test.py::Benchmark.test_end_to_end_latency" bash tests/docker/run_tests.sh
   
   > Configure project :
   Starting build with version 3.5.0-SNAPSHOT (commit id 4ecc30e0) using Gradle 7.6, Java 1.8 and Scala 2.13.10
   Build properties: maxParallelForks=8, maxScalacThreads=8, maxTestRetries=0
   
   BUILD SUCCESSFUL in 10s
   170 actionable tasks: 170 up-to-date
   docker exec ducker01 bash -c "cd /opt/kafka-dev && ducktape --cluster-file /opt/kafka-dev/tests/docker/build/cluster.json  ./tests/kafkatest/benchmarks/core/benchmark_test.py::Benchmark.test_end_to_end_latency "
   /usr/local/lib/python3.9/dist-packages/paramiko/transport.py:236: CryptographyDeprecationWarning: Blowfish has been deprecated
     "class": algorithms.Blowfish,
   [INFO:2023-02-12 00:59:22,204]: starting test run with session id 2023-02-12--002...
   [INFO:2023-02-12 00:59:22,206]: running 10 tests...
   [INFO:2023-02-12 00:59:22,208]: Triggering test 1 of 10...
   [INFO:2023-02-12 00:59:22,271]: RunnerClient: Loading test {'directory': '/opt/kafka-dev/tests/kafkatest/benchmarks/core', 'file_name': 'benchmark_test.py', 'cls_name': 'Benchmark', 'method_name': 'test_end_to_end_latency', 'injected_args': {'security_protocol': 'SASL_PLAINTEXT', 'compression_type': 'none'}}
   [INFO:2023-02-12 00:59:22,300]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_PLAINTEXT.compression_type=none: on run 1/1
   [INFO:2023-02-12 00:59:22,321]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_PLAINTEXT.compression_type=none: Setting up...
   [INFO:2023-02-12 00:59:27,898]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_PLAINTEXT.compression_type=none: Running...
   [INFO:2023-02-12 01:03:02,248]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_PLAINTEXT.compression_type=none: Tearing down...
   [INFO:2023-02-12 01:03:34,257]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_PLAINTEXT.compression_type=none: PASS
   [INFO:2023-02-12 01:03:34,263]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_PLAINTEXT.compression_type=none: Data: {'latency_50th_ms': 7.0, 'latency_99th_ms': 47.0, 'latency_999th_ms': 169.0}
   [INFO:2023-02-12 01:03:34,347]: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   [INFO:2023-02-12 01:03:34,350]: Triggering test 2 of 10...
   [INFO:2023-02-12 01:03:34,399]: RunnerClient: Loading test {'directory': '/opt/kafka-dev/tests/kafkatest/benchmarks/core', 'file_name': 'benchmark_test.py', 'cls_name': 'Benchmark', 'method_name': 'test_end_to_end_latency', 'injected_args': {'security_protocol': 'SASL_PLAINTEXT', 'compression_type': 'snappy'}}
   [INFO:2023-02-12 01:03:34,434]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_PLAINTEXT.compression_type=snappy: on run 1/1
   [INFO:2023-02-12 01:03:34,446]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_PLAINTEXT.compression_type=snappy: Setting up...
   [INFO:2023-02-12 01:03:47,646]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_PLAINTEXT.compression_type=snappy: Running...
   [INFO:2023-02-12 01:07:06,578]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_PLAINTEXT.compression_type=snappy: Tearing down...
   [INFO:2023-02-12 01:07:34,367]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_PLAINTEXT.compression_type=snappy: PASS
   [INFO:2023-02-12 01:07:34,372]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_PLAINTEXT.compression_type=snappy: Data: {'latency_50th_ms': 9.0, 'latency_99th_ms': 50.0, 'latency_999th_ms': 131.0}
   [INFO:2023-02-12 01:07:34,442]: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   [INFO:2023-02-12 01:07:34,445]: Triggering test 3 of 10...
   [INFO:2023-02-12 01:07:34,485]: RunnerClient: Loading test {'directory': '/opt/kafka-dev/tests/kafkatest/benchmarks/core', 'file_name': 'benchmark_test.py', 'cls_name': 'Benchmark', 'method_name': 'test_end_to_end_latency', 'injected_args': {'security_protocol': 'SASL_SSL', 'compression_type': 'none'}}
   [INFO:2023-02-12 01:07:34,506]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_SSL.compression_type=none: on run 1/1
   [INFO:2023-02-12 01:07:34,520]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_SSL.compression_type=none: Setting up...
   [INFO:2023-02-12 01:07:38,538]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_SSL.compression_type=none: Running...
   [INFO:2023-02-12 01:11:25,549]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_SSL.compression_type=none: Tearing down...
   [INFO:2023-02-12 01:11:49,491]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_SSL.compression_type=none: PASS
   [INFO:2023-02-12 01:11:49,495]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_SSL.compression_type=none: Data: {'latency_50th_ms': 9.0, 'latency_99th_ms': 114.0, 'latency_999th_ms': 390.0}
   [INFO:2023-02-12 01:11:49,554]: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   [INFO:2023-02-12 01:11:49,556]: Triggering test 4 of 10...
   [INFO:2023-02-12 01:11:49,608]: RunnerClient: Loading test {'directory': '/opt/kafka-dev/tests/kafkatest/benchmarks/core', 'file_name': 'benchmark_test.py', 'cls_name': 'Benchmark', 'method_name': 'test_end_to_end_latency', 'injected_args': {'security_protocol': 'SASL_SSL', 'compression_type': 'snappy'}}
   [INFO:2023-02-12 01:11:49,638]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_SSL.compression_type=snappy: on run 1/1
   [INFO:2023-02-12 01:11:49,651]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_SSL.compression_type=snappy: Setting up...
   [INFO:2023-02-12 01:11:53,742]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_SSL.compression_type=snappy: Running...
   [INFO:2023-02-12 01:15:14,809]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_SSL.compression_type=snappy: Tearing down...
   [INFO:2023-02-12 01:15:46,057]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_SSL.compression_type=snappy: PASS
   [INFO:2023-02-12 01:15:46,061]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=SASL_SSL.compression_type=snappy: Data: {'latency_50th_ms': 8.0, 'latency_99th_ms': 62.0, 'latency_999th_ms': 125.0}
   [INFO:2023-02-12 01:15:46,131]: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   [INFO:2023-02-12 01:15:46,136]: Triggering test 5 of 10...
   [INFO:2023-02-12 01:15:46,190]: RunnerClient: Loading test {'directory': '/opt/kafka-dev/tests/kafkatest/benchmarks/core', 'file_name': 'benchmark_test.py', 'cls_name': 'Benchmark', 'method_name': 'test_end_to_end_latency', 'injected_args': {'security_protocol': 'PLAINTEXT', 'compression_type': 'none'}}
   [INFO:2023-02-12 01:15:46,228]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=PLAINTEXT.compression_type=none: on run 1/1
   [INFO:2023-02-12 01:15:46,319]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=PLAINTEXT.compression_type=none: Setting up...
   [INFO:2023-02-12 01:15:50,523]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=PLAINTEXT.compression_type=none: Running...
   [INFO:2023-02-12 01:17:00,744]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=PLAINTEXT.compression_type=none: Tearing down...
   [INFO:2023-02-12 01:17:30,336]: RunnerClient: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_end_to_end_latency.security_protocol=PLAINTEXT.compression_type=none: FAIL: TimeoutError("Kafka servers didn't register at ZK within 30 seconds")
   ```
   
   I stopped after #5 as there was some flakiness.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1088121137


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("USAGE: java " + EndToEndLatency.class.getName()
+                    + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+        }
+
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        Optional<String> propertiesFile = args.length > 5 ? (Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : Optional.empty();
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(propertiesFile, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(propertiesFile, brokers, acks)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(propertiesFile, brokers, topic);
+            }
+            setupConsumer(topic, consumer);
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d%n", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, Integer.valueOf(random.nextInt(26) + 65).byteValue());
+        return randomBytes;
+    }
+
+    private static void createTopic(Optional<String> propertiesFile, String brokers, String topic) throws IOException {
+        short defaultReplicationFactor = 1;
+        int defaultNumPartitions = 1;

Review Comment:
   Can we keep these 2 variables as constants and move them next to `POLL_TIMEOUT_MS` above?



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("USAGE: java " + EndToEndLatency.class.getName()
+                    + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+        }
+
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        Optional<String> propertiesFile = args.length > 5 ? (Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : Optional.empty();
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(propertiesFile, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(propertiesFile, brokers, acks)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(propertiesFile, brokers, topic);
+            }
+            setupConsumer(topic, consumer);
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());

Review Comment:
   Sometimes we put dots at the end of lines and some other times it's in the front. Can you make it constant in this file?



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("USAGE: java " + EndToEndLatency.class.getName()
+                    + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+        }
+
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        Optional<String> propertiesFile = args.length > 5 ? (Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : Optional.empty();
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(propertiesFile, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(propertiesFile, brokers, acks)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(propertiesFile, brokers, topic);
+            }
+            setupConsumer(topic, consumer);
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d%n", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, Integer.valueOf(random.nextInt(26) + 65).byteValue());
+        return randomBytes;
+    }
+
+    private static void createTopic(Optional<String> propertiesFile, String brokers, String topic) throws IOException {
+        short defaultReplicationFactor = 1;
+        int defaultNumPartitions = 1;
+
+        System.out.printf("Topic \"%s\" does not exist. " +
+                        "Will create topic with %d partition(s) and replication factor = %d%n",
+                topic, defaultNumPartitions, defaultReplicationFactor);
+
+        Properties adminProps = loadPropsWithBootstrapServers(propertiesFile, brokers);
+        Admin adminClient = Admin.create(adminProps);
+        NewTopic newTopic = new NewTopic(topic, defaultNumPartitions, defaultReplicationFactor);
+        try {
+            adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+        } catch (ExecutionException | InterruptedException e) {
+            System.out.printf("Creation of topic %s failed", topic);

Review Comment:
   Can we add a new line here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1071444925


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;

Review Comment:
   hmm I can do that but imo it's not going to cause much difference in terms of readability or other factors. Of course using Optional would help in attain parity with the scala code but that's all we get. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1071471254


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;

Review Comment:
   It would make the code much more readable IMO and it's a little change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1071526879


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;

Review Comment:
   Yeah, please don't replace `Option` with `null`. The equivalent is `Optional` in such cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1381690612

   hi @mimaison , I added a few basic unit tests and updated the system test needs (end_to_end_latency.py). I haven't set it up locally but I am hoping those should run from here to validate if the changes worked. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1437369614

   Thanks @mimaison . I addressed the comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1410076642

   > @mimaison , I updated the system test to point to the new class. That one place seemed to be the only one relevant in this case.
   
   Do you have a test run output that shows it works and run time is similar? You can look at what I did for the JmxTool migration that is also used by STs. I would also suggest to discard the first run of such test, because the test framework needs to start a bunch of containers.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1433009209

   Tests passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1376021193

   We already depend on core when it comes to the tools test module, so we don't necessarily have to move things for that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375755934

   @ijuma , I made the changes, but I couldn't find any tests associated with the scala class. Wanted to know how can I test this . 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375924994

   Actually i pinged too soon :) Before getting it reviewed, I would test on my local and also add a couple of tests. Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1071348242


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;

Review Comment:
   You should also use Optional, as in the original code. This would also require some other changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1070781326


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;

Review Comment:
   Yes that was a miss. Added the filter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1382883307

   https://github.com/apache/kafka/pull/13095#discussion_r1069689088
   
   I looked at other classes in tool and saw arg4jparse being used in them and assumed that this is the direction that has been chosen. Is that not the case? In that case I can revert to using args. Plz let me know


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1071445060


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));

Review Comment:
   > I don't think we need to keep the original code when it's strictly worse (for example the original code ends up exhausting the iterator to get the size).
   > 
   
   Fair enough. Thanks.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1408441506

   I simply meant that it looks like some changes in the system tests are required too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1410032222

   @mimaison , I updated the system test to point to the new class. That one place seemed to be the only one relevant in this case. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1407268172

   > EndToEndLatencyService
   
   Thanks @mimaison . Actually I lack some context here. The class, `TestEndToEndLatency` was renamed to `EndToEndLatency` with this very old PR: https://github.com/apache/kafka/commit/e43c9aff92c57da6abb0c1d0af3431a550110a89#diff-52dbfa7ab683a53b91a84c35f309b56ff1b2a1cd94e4ccb86c5843e9e44a050f and in a subsequent PR, the support for zk_connect was removed. So, the line you highlighted above mayn't need to be changed. Also, there's no reference of `TestEndToEndLatency` in the project anymore in trunk so I am assuming it would be shipped with older version (< 0.9) maybe?
   
   Were you referring to this line instead? https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/performance/end_to_end_latency.py#L127
   which is invoked on line #91? I agree, that needs to be changed. Plz let me know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1432614316

   @fvaleri , I fixed the above error. Some of the containers had died because of which the error occurred. I have a clean run of the system test now:
   
   ```
   ================================================================================
   SESSION REPORT (ALL TESTS)
   ducktape version: 0.11.3
   session_id:       2023-02-15--005
   run time:         7 minutes 46.389 seconds
   tests run:        7
   passed:           7
   flaky:            0
   failed:           0
   ignored:          0
   ================================================================================
   test_id:    kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version=0.8.2.2.new_consumer=False
   status:     PASS
   run time:   56.724 seconds
   {"producer_performance": {"records_per_sec": 17123.287671, "mb_per_sec": 1.63}, "end_to_end_latency": {"latency_50th_ms": 1.0, "latency_99th_ms": 5.0, "latency_999th_ms": 16.0}, "consumer_performance": {"records_per_sec": 1428571.4286, "mb_per_sec": 136.2392}}
   --------------------------------------------------------------------------------
   test_id:    kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version=0.9.0.1
   status:     PASS
   run time:   1 minute 7.159 seconds
   {"producer_performance": {"records_per_sec": 10152.284264, "mb_per_sec": 0.97}, "end_to_end_latency": {"latency_50th_ms": 1.0, "latency_99th_ms": 10.0, "latency_999th_ms": 23.0}, "consumer_performance": {"records_per_sec": 62305.296, "mb_per_sec": 3.5948}}
   --------------------------------------------------------------------------------
   test_id:    kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version=0.9.0.1.new_consumer=False
   status:     PASS
   run time:   58.971 seconds
   {"producer_performance": {"records_per_sec": 13262.599469, "mb_per_sec": 1.26}, "end_to_end_latency": {"latency_50th_ms": 1.0, "latency_99th_ms": 9.0, "latency_999th_ms": 23.0}, "consumer_performance": {"records_per_sec": 1428571.4286, "mb_per_sec": 136.2392}}
   --------------------------------------------------------------------------------
   test_id:    kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version=1.1.1.new_consumer=False
   status:     PASS
   run time:   1 minute 6.057 seconds
   {"producer_performance": {"records_per_sec": 11350.737798, "mb_per_sec": 1.08}, "end_to_end_latency": {"latency_50th_ms": 2.0, "latency_99th_ms": 10.0, "latency_999th_ms": 21.0}, "consumer_performance": {"records_per_sec": 1666666.6667, "mb_per_sec": 158.9457}}
   --------------------------------------------------------------------------------
   test_id:    kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version=dev.metadata_quorum=COLOCATED_KRAFT
   status:     PASS
   run time:   1 minute 8.146 seconds
   {"producer_performance": {"records_per_sec": 4918.839154, "mb_per_sec": 0.47}, "end_to_end_latency": {"latency_50th_ms": 2.0, "latency_99th_ms": 12.0, "latency_999th_ms": 34.0}, "consumer_performance": {"records_per_sec": 12199.5134, "mb_per_sec": 1.1609}}
   --------------------------------------------------------------------------------
   test_id:    kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version=dev.metadata_quorum=REMOTE_KRAFT
   status:     PASS
   run time:   1 minute 16.390 seconds
   {"producer_performance": {"records_per_sec": 11520.737327, "mb_per_sec": 1.1}, "end_to_end_latency": {"latency_50th_ms": 2.0, "latency_99th_ms": 13.0, "latency_999th_ms": 42.0}, "consumer_performance": {"records_per_sec": 11722.807, "mb_per_sec": 1.1159}}
   --------------------------------------------------------------------------------
   test_id:    kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version=dev.metadata_quorum=ZK
   status:     PASS
   run time:   1 minute 12.011 seconds
   {"producer_performance": {"records_per_sec": 7547.169811, "mb_per_sec": 0.72}, "end_to_end_latency": {"latency_50th_ms": 2.0, "latency_99th_ms": 13.0, "latency_999th_ms": 29.0}, "consumer_performance": {"records_per_sec": 11990.8362, "mb_per_sec": 1.1031}}
   --------------------------------------------------------------------------------
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375845098

   Thanks for the PR. Can we add a test in that case? We'd want to verify manually that the test matches the previous behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
mimaison commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375933240

   Yes I can review this.
   
   I started looking at [KAFKA-14525](https://issues.apache.org/jira/browse/KAFKA-14525) because we were stepping on each others toes in [KAFKA-14470](https://issues.apache.org/jira/browse/KAFKA-14470), but we should finish that first.
   
   Many of the tests for these commands start full clusters and all that test logic is currently in core. We should be able to move it to server-common but I'm not quite sure if we want to drag many ZooKeeper bits there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1387172335

   > Builds on 8 and 17 are ok. Test failure on 11 is unrelated and it works fine on my machine.
   > 
   > Thanks.
   
   Thanks @fvaleri . @mimaison would you be able to take a look as well? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1070780996


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            //Results
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, Integer.valueOf(random.nextInt(26) + 65).byteValue());
+        return randomBytes;
+    }
+
+    private static void createTopic(Properties props, String topic) {
+
+        short defaultReplicationFactor = 1;
+        int defaultNumPartitions = 1;
+
+        System.out.printf("Topic \"%s\" does not exist. " +
+                        "Will create topic with %d partition(s) and replication factor = %d%n",
+                topic, defaultNumPartitions, defaultReplicationFactor);
+
+        Admin adminClient = Admin.create(props);
+        NewTopic newTopic = new NewTopic(topic, defaultNumPartitions, defaultReplicationFactor);
+        adminClient.createTopics(Collections.singletonList(newTopic));
+        try {
+            adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+        } catch (ExecutionException | InterruptedException e) {
+            System.out.printf("Creation of topic %s failed", topic);
+            throw new RuntimeException(e);
+        } finally {
+            Utils.closeQuietly(adminClient, "AdminClient");
+        }
+    }
+
+    private static Properties loadPropsWithBootstrapServers(String propertiesFile) throws IOException {
+        return propertiesFile != null ? Utils.loadProps(propertiesFile) : new Properties();
+    }
+
+    private static KafkaConsumer<byte[], byte[]> createKafkaConsumer(Properties properties, String brokers) {
+        Properties consumerProps = new Properties(properties);
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1070782792


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));

Review Comment:
   The reason I am not using the iterator is that `ConsumerRecord` exposes functions like `isEmpty` and `count` which seemed easier to understand when used in `validate` method. Let me know if you think otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1405382219

   > Sorry for the delay. Thanks for the PR, I left a few minor suggestions.
   
   No problem! I addressed the comments. Thanks for the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1069260140


##########
tests/kafkatest/services/performance/end_to_end_latency.py:
##########
@@ -87,7 +87,7 @@ def start_cmd(self, node):
         cmd = "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % EndToEndLatencyService.LOG4J_CONFIG
         if node.version.consumer_supports_bootstrap_server():
             cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s %(java_class_name)s " % args
-            cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(config_file)s" % args
+            cmd += "-b %(bootstrap_servers)s -t %(topic)s -n %(num_records)d -a %(acks)d -s %(message_bytes)d -f %(config_file)s" % args

Review Comment:
   This change and https://github.com/apache/kafka/pull/13095/files#diff-1a3735187400a54aac5a802fb2d2ff6d4fe9cdfbc5fd953a45e2890ad43f58cfR157-R207 are needed because in argparse4j, if we are using positional arguments, then from what I understood, they can't be made optional. Properties file is an optional argument as per the scala code but it worked in that case as the main arguments were used as is. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1070781134


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            //Results
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, Integer.valueOf(random.nextInt(26) + 65).byteValue());
+        return randomBytes;
+    }
+
+    private static void createTopic(Properties props, String topic) {
+
+        short defaultReplicationFactor = 1;
+        int defaultNumPartitions = 1;
+
+        System.out.printf("Topic \"%s\" does not exist. " +
+                        "Will create topic with %d partition(s) and replication factor = %d%n",
+                topic, defaultNumPartitions, defaultReplicationFactor);
+
+        Admin adminClient = Admin.create(props);
+        NewTopic newTopic = new NewTopic(topic, defaultNumPartitions, defaultReplicationFactor);
+        adminClient.createTopics(Collections.singletonList(newTopic));
+        try {
+            adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+        } catch (ExecutionException | InterruptedException e) {
+            System.out.printf("Creation of topic %s failed", topic);
+            throw new RuntimeException(e);
+        } finally {
+            Utils.closeQuietly(adminClient, "AdminClient");
+        }
+    }
+
+    private static Properties loadPropsWithBootstrapServers(String propertiesFile) throws IOException {
+        return propertiesFile != null ? Utils.loadProps(propertiesFile) : new Properties();
+    }
+
+    private static KafkaConsumer<byte[], byte[]> createKafkaConsumer(Properties properties, String brokers) {
+        Properties consumerProps = new Properties(properties);
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis());
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0"); //ensure we have no temporal batching
+        return new KafkaConsumer<>(consumerProps);
+    }
+
+    private static KafkaProducer<byte[], byte[]> createKafkaProducer(Properties properties, String acks, String brokers) {
+        Properties producerProps = new Properties(properties);
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

Review Comment:
   done



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1072418814


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("USAGE: java " + EndToEndLatency.class.getName()
+                    + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+        }
+
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        Optional<String> propertiesFile = args.length > 5 ? (Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : Optional.empty();
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(propertiesFile, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(propertiesFile, brokers, acks)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(propertiesFile, brokers, topic);
+            }
+            setupConsumer(topic, consumer);
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d%n", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, Integer.valueOf(random.nextInt(26) + 65).byteValue());
+        return randomBytes;
+    }
+
+    private static void createTopic(Optional<String> propertiesFile, String brokers, String topic) throws IOException {
+        short defaultReplicationFactor = 1;
+        int defaultNumPartitions = 1;
+
+        System.out.printf("Topic \"%s\" does not exist. " +
+                        "Will create topic with %d partition(s) and replication factor = %d%n",
+                topic, defaultNumPartitions, defaultReplicationFactor);
+
+        Properties adminProps = loadPropsWithBootstrapServers(propertiesFile, brokers);
+        Admin adminClient = Admin.create(adminProps);
+        NewTopic newTopic = new NewTopic(topic, defaultNumPartitions, defaultReplicationFactor);
+        adminClient.createTopics(Collections.singletonList(newTopic));

Review Comment:
   You need to remove this line to avoid `TopicExistsException` when the test topic does not exists. Once you do that, then I'll approve.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1071369472


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));

Review Comment:
   I don't think we need to keep the original code when it's strictly worse (for example the original code ends up exhausting the iterator to get the size).
   
   We should keep the original code if there isn't a clear improvement from changing and whatever changes we do should be localized - changes that affect many methods, other files, etc. are best avoided if possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1071369472


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));

Review Comment:
   I don't think we need to keep the original code when it's strictly worse (for example the original code ends up exhausting the iterator to get the size).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1088168895


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("USAGE: java " + EndToEndLatency.class.getName()
+                    + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+        }
+
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        Optional<String> propertiesFile = args.length > 5 ? (Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : Optional.empty();
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(propertiesFile, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(propertiesFile, brokers, acks)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(propertiesFile, brokers, topic);
+            }
+            setupConsumer(topic, consumer);
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());

Review Comment:
   i made the usage of dots consistent in this block of code. I couldn't find other occurrences of inconsistencies wrt dots. There was one for + when being used for concatenation which i have made consistent.



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("USAGE: java " + EndToEndLatency.class.getName()
+                    + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+        }
+
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        Optional<String> propertiesFile = args.length > 5 ? (Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : Optional.empty();
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(propertiesFile, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(propertiesFile, brokers, acks)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(propertiesFile, brokers, topic);
+            }
+            setupConsumer(topic, consumer);
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d%n", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, Integer.valueOf(random.nextInt(26) + 65).byteValue());
+        return randomBytes;
+    }
+
+    private static void createTopic(Optional<String> propertiesFile, String brokers, String topic) throws IOException {
+        short defaultReplicationFactor = 1;
+        int defaultNumPartitions = 1;
+
+        System.out.printf("Topic \"%s\" does not exist. " +
+                        "Will create topic with %d partition(s) and replication factor = %d%n",
+                topic, defaultNumPartitions, defaultReplicationFactor);
+
+        Properties adminProps = loadPropsWithBootstrapServers(propertiesFile, brokers);
+        Admin adminClient = Admin.create(adminProps);
+        NewTopic newTopic = new NewTopic(topic, defaultNumPartitions, defaultReplicationFactor);
+        try {
+            adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+        } catch (ExecutionException | InterruptedException e) {
+            System.out.printf("Creation of topic %s failed", topic);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1112025397


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer

Review Comment:
   Can we update the comment to include the topic argument that's required too?
   
   Also let's format this javadoc comment a bit so it renders nicely



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+    private final static long POLL_TIMEOUT_MS = 60000;
+    private final static short DEFAULT_REPLICATION_FACTOR = 1;
+    private final static int DEFAULT_NUM_PARTITIONS = 1;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("USAGE: java " + EndToEndLatency.class.getName()
+                    + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+        }
+
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        Optional<String> propertiesFile = args.length > 5 ? (Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : Optional.empty();

Review Comment:
   Would `Optional<String> propertiesFile = (args.length > 5 && !Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty();` be more readable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1427752677

   Thanks @fvaleri  for pointing me to the fix you made for JMXTool issue. I made the changes to also look into version when choosing the EndToEndLatency class. 
   
   I did point out the usage of TestEndToEndLatency [here](https://github.com/apache/kafka/pull/13095#issuecomment-1407268172) but I don't have context here. Also, when I try to run `kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version`, I get this:
   
   
   ```
   [INFO:2023-02-13 03:03:31,147]: RunnerClient: kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version=dev.metadata_quorum=ZK: FAIL: gaierror(-2, 'Name or service not known')
   Traceback (most recent call last):
     File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 185, in _do_run
       self.setup_test()
     File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 237, in setup_test
       self.test.setup()
     File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/test.py", line 74, in setup
       self.setUp()
     File "/opt/kafka-dev/tests/kafkatest/sanity_checks/test_performance_services.py", line 38, in setUp
       self.zk.start()
     File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 265, in start
       self.start_node(node, **kwargs)
     File "/opt/kafka-dev/tests/kafkatest/services/zookeeper.py", line 95, in start_node
       node.account.ssh("mkdir -p %s" % ZookeeperService.DATA)
     File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", line 35, in wrapper
       return method(self, *args, **kwargs)
     File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", line 300, in ssh
       client = self.ssh_client
     File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", line 215, in ssh_client
       self._set_ssh_client()
     File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", line 35, in wrapper
       return method(self, *args, **kwargs)
     File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", line 189, in _set_ssh_client
       client.connect(
     File "/usr/local/lib/python3.9/dist-packages/paramiko/client.py", line 340, in connect
       to_try = list(self._families_and_addresses(hostname, port))
     File "/usr/local/lib/python3.9/dist-packages/paramiko/client.py", line 203, in _families_and_addresses
       addrinfos = socket.getaddrinfo(
     File "/usr/lib/python3.9/socket.py", line 953, in getaddrinfo
       for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
   socket.gaierror: [Errno -2] Name or service not known
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1441516170

   Tests have passed for one of the builds.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1441214746

   Thanks Michael. I added a happy path testcase.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1070545176


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+    private final static long TIMEOUT = 60000;
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+
+        ArgumentParser parser = addArguments();
+
+        Namespace res = null;
+        try {
+            res = parser.parseArgs(args);
+        } catch (ArgumentParserException e) {
+            if (args.length == 0) {
+                parser.printHelp();
+                Exit.exit(0);
+            } else {
+                parser.handleError(e);
+                Exit.exit(1);
+            }
+        }
+
+        String brokers = res.getString("broker_list");
+        String topic = res.getString("topic");
+        int numMessages = res.getInt("num_messages");
+        String acks = res.getString("producer_acks");
+        int messageSizeBytes = res.getInt("message_size_bytes");
+        String propertiesFile = res.getString("properties_file");
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(TIMEOUT));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            //Results
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+
+    }
+
+    static ArgumentParser addArguments() {
+
+        ArgumentParser parser = ArgumentParsers
+                .newArgumentParser("end-to-end-latency")
+                .defaultHelp(true)
+                .description("This tool records the average end to end latency for a single message to travel through Kafka");
+
+
+        parser
+                .addArgument("-b", "--brokers")
+                .action(store())
+                .type(String.class)
+                .dest("broker_list")
+                .help("The location of the bootstrap broker for both the producer and the consumer");
+
+        parser
+                .addArgument("-t", "--topic")
+                .action(store())
+                .type(String.class)
+                .dest("topic")
+                .help("The Kakfa topic to send/receive messages to/from");
+
+        parser
+                .addArgument("-n", "--num-records")
+                .action(store())
+                .type(Integer.class)
+                .dest("num_messages")
+                .help("The number of messages to send");
+
+        parser
+                .addArgument("-a", "--acks")
+                .action(store())
+                .type(String.class)
+                .dest("producer_acks")
+                .help("The number of messages to send");
+
+        parser
+                .addArgument("-s", "--message-bytes")
+                .required(true)
+                .action(store())
+                .type(Integer.class)
+                .dest("message_size_bytes")
+                .help("Size of each message in bytes");
+
+        parser
+                .addArgument("-f", "--properties-file")
+                .action(store())
+                .type(String.class)
+                .dest("properties_file");
+
+        return parser;
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + TIMEOUT + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms\n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, new Integer(random.nextInt(26) + 65).byteValue());

Review Comment:
   done



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+    private final static long TIMEOUT = 60000;
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1070781463


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {

Review Comment:
   done. I guess this is now more or less similar to the original scala code.



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            //Results

Review Comment:
   yeah not needed. Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1383408137

   @fvaleri , thanks. I made the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1070379239


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+    private final static long TIMEOUT = 60000;
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+
+        ArgumentParser parser = addArguments();
+
+        Namespace res = null;
+        try {
+            res = parser.parseArgs(args);
+        } catch (ArgumentParserException e) {
+            if (args.length == 0) {
+                parser.printHelp();
+                Exit.exit(0);
+            } else {
+                parser.handleError(e);
+                Exit.exit(1);
+            }
+        }
+
+        String brokers = res.getString("broker_list");
+        String topic = res.getString("topic");
+        int numMessages = res.getInt("num_messages");
+        String acks = res.getString("producer_acks");
+        int messageSizeBytes = res.getInt("message_size_bytes");
+        String propertiesFile = res.getString("properties_file");
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {

Review Comment:
   oh corrected that now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1071348600


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));

Review Comment:
   I would prefer to stick with the original code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1071349940


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;

Review Comment:
   The original code is using Optional, which is a much better approach, and this also requires some changes further down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1071369472


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));

Review Comment:
   I don't think we need to keep the original code when it's strictly worse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1071438783


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            if (args.length != 5 && args.length != 6) {
+                System.err.println("USAGE: java " + EndToEndLatency.class.getName()
+                        + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+                return 1;
+            }
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        String propertiesFile = args.length > 5 ? args[5] : null;
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));

Review Comment:
   Yeah in this case I feel using the methods provided by `ConsumerRecords` seems cleaner. 
   
   > We should keep the original code if there isn't a clear improvement from changing and whatever changes we do should be localized - changes that affect many methods, other files, etc. are best avoided if possible.
   
   Ack. Would keep that in mind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1072418814


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    private final static long POLL_TIMEOUT_MS = 60000;
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("USAGE: java " + EndToEndLatency.class.getName()
+                    + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
+        }
+
+        String brokers = args[0];
+        String topic = args[1];
+        int numMessages = Integer.parseInt(args[2]);
+        String acks = args[3];
+        int messageSizeBytes = Integer.parseInt(args[4]);
+        Optional<String> propertiesFile = args.length > 5 ? (Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : Optional.empty();
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(propertiesFile, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(propertiesFile, brokers, acks)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(propertiesFile, brokers, topic);
+            }
+            setupConsumer(topic, consumer);
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d%n", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, Integer.valueOf(random.nextInt(26) + 65).byteValue());
+        return randomBytes;
+    }
+
+    private static void createTopic(Optional<String> propertiesFile, String brokers, String topic) throws IOException {
+        short defaultReplicationFactor = 1;
+        int defaultNumPartitions = 1;
+
+        System.out.printf("Topic \"%s\" does not exist. " +
+                        "Will create topic with %d partition(s) and replication factor = %d%n",
+                topic, defaultNumPartitions, defaultReplicationFactor);
+
+        Properties adminProps = loadPropsWithBootstrapServers(propertiesFile, brokers);
+        Admin adminClient = Admin.create(adminProps);
+        NewTopic newTopic = new NewTopic(topic, defaultNumPartitions, defaultReplicationFactor);
+        adminClient.createTopics(Collections.singletonList(newTopic));

Review Comment:
   You need to remove this line to avoid `TopicExistsException` when the test topic does not exist. Once you do that, I'll approve.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org