You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/13 17:40:04 UTC

[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

fvaleri commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1069628843


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+    private final static long TIMEOUT = 60000;
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+
+        ArgumentParser parser = addArguments();
+
+        Namespace res = null;
+        try {
+            res = parser.parseArgs(args);
+        } catch (ArgumentParserException e) {
+            if (args.length == 0) {
+                parser.printHelp();
+                Exit.exit(0);
+            } else {
+                parser.handleError(e);
+                Exit.exit(1);
+            }
+        }
+
+        String brokers = res.getString("broker_list");
+        String topic = res.getString("topic");
+        int numMessages = res.getInt("num_messages");
+        String acks = res.getString("producer_acks");
+        int messageSizeBytes = res.getInt("message_size_bytes");
+        String propertiesFile = res.getString("properties_file");
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {

Review Comment:
   Here you have consumer properties passed to the producer instance with warnings printed out to the console. That's not what the original code does.



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+    private final static long TIMEOUT = 60000;

Review Comment:
   Why this is not at the start of the class? Can we rename it to `POLL_TIMEOUT_MS`?



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+    private final static long TIMEOUT = 60000;
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+
+        ArgumentParser parser = addArguments();
+
+        Namespace res = null;
+        try {
+            res = parser.parseArgs(args);
+        } catch (ArgumentParserException e) {
+            if (args.length == 0) {
+                parser.printHelp();
+                Exit.exit(0);
+            } else {
+                parser.handleError(e);
+                Exit.exit(1);
+            }
+        }
+
+        String brokers = res.getString("broker_list");
+        String topic = res.getString("topic");
+        int numMessages = res.getInt("num_messages");
+        String acks = res.getString("producer_acks");
+        int messageSizeBytes = res.getInt("message_size_bytes");
+        String propertiesFile = res.getString("properties_file");
+
+        if (!Arrays.asList("1", "all").contains(acks)) {
+            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
+        }
+
+        Properties props;
+        try {
+            props = loadPropsWithBootstrapServers(propertiesFile);
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("Properties file not found");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(props, brokers);
+             KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props, acks, brokers)) {
+
+            if (!consumer.listTopics().containsKey(topic)) {
+                createTopic(props, topic);
+            }
+
+            setupConsumer(topic, consumer);
+
+            double totalTime = 0.0;
+            long[] latencies = new long[numMessages];
+            Random random = new Random(0);
+
+            for (int i = 0; i < numMessages; i++) {
+                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+                long begin = System.nanoTime();
+                //Send message (of random bytes) synchronously then immediately poll for it
+                producer.send(new ProducerRecord<>(topic, message)).get();
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(TIMEOUT));
+                long elapsed = System.nanoTime() - begin;
+
+                validate(consumer, message, records);
+
+                //Report progress
+                if (i % 1000 == 0)
+                    System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
+                totalTime += elapsed;
+                latencies[i] = elapsed / 1000 / 1000;
+            }
+
+            //Results
+            printResults(numMessages, totalTime, latencies);
+            consumer.commitSync();
+        }
+
+    }
+
+    static ArgumentParser addArguments() {
+
+        ArgumentParser parser = ArgumentParsers
+                .newArgumentParser("end-to-end-latency")
+                .defaultHelp(true)
+                .description("This tool records the average end to end latency for a single message to travel through Kafka");
+
+
+        parser
+                .addArgument("-b", "--brokers")
+                .action(store())
+                .type(String.class)
+                .dest("broker_list")
+                .help("The location of the bootstrap broker for both the producer and the consumer");
+
+        parser
+                .addArgument("-t", "--topic")
+                .action(store())
+                .type(String.class)
+                .dest("topic")
+                .help("The Kakfa topic to send/receive messages to/from");
+
+        parser
+                .addArgument("-n", "--num-records")
+                .action(store())
+                .type(Integer.class)
+                .dest("num_messages")
+                .help("The number of messages to send");
+
+        parser
+                .addArgument("-a", "--acks")
+                .action(store())
+                .type(String.class)
+                .dest("producer_acks")
+                .help("The number of messages to send");
+
+        parser
+                .addArgument("-s", "--message-bytes")
+                .required(true)
+                .action(store())
+                .type(Integer.class)
+                .dest("message_size_bytes")
+                .help("Size of each message in bytes");
+
+        parser
+                .addArgument("-f", "--properties-file")
+                .action(store())
+                .type(String.class)
+                .dest("properties_file");
+
+        return parser;
+    }
+
+    // Visible for testing
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
+        if (records.isEmpty()) {
+            consumer.commitSync();
+            throw new RuntimeException("poll() timed out before finding a result (timeout:[" + TIMEOUT + "])");
+        }
+
+        //Check result matches the original record
+        String sent = new String(message, StandardCharsets.UTF_8);
+        String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
+
+        if (!read.equals(sent)) {
+            consumer.commitSync();
+            throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
+        }
+
+        //Check we only got the one message
+        if (records.count() != 1) {
+            int count = records.count();
+            consumer.commitSync();
+            throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
+        }
+    }
+
+    private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+        List<TopicPartition> topicPartitions = consumer.
+                partitionsFor(topic).
+                stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                .collect(Collectors.toList());
+        consumer.assign(topicPartitions);
+        consumer.seekToEnd(topicPartitions);
+        consumer.assignment().forEach(consumer::position);
+    }
+
+    private static void printResults(int numMessages, double totalTime, long[] latencies) {
+        System.out.printf("Avg latency: %.4f ms\n", totalTime / numMessages / 1000.0 / 1000.0);
+        Arrays.sort(latencies);
+        int p50 = (int) latencies[(int) (latencies.length * 0.5)];
+        int p99 = (int) latencies[(int) (latencies.length * 0.99)];
+        int p999 = (int) latencies[(int) (latencies.length * 0.999)];
+        System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d", p50, p99, p999);
+    }
+
+    private static byte[] randomBytesOfLen(Random random, int length) {
+        byte[] randomBytes = new byte[length];
+        Arrays.fill(randomBytes, new Integer(random.nextInt(26) + 65).byteValue());

Review Comment:
   Looks like this Integer constructor is deprecated, you should use `valueOf`.



##########
tests/kafkatest/services/performance/end_to_end_latency.py:
##########
@@ -87,7 +87,7 @@ def start_cmd(self, node):
         cmd = "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % EndToEndLatencyService.LOG4J_CONFIG
         if node.version.consumer_supports_bootstrap_server():
             cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s %(java_class_name)s " % args
-            cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(config_file)s" % args
+            cmd += "-b %(bootstrap_servers)s -t %(topic)s -n %(num_records)d -a %(acks)d -s %(message_bytes)d -f %(config_file)s" % args

Review Comment:
   This is completely changing the interface of this tool, not just the package name. Why we need to introduce `argparse4j` if it wasn't used in the original code? I would stay as close as possible to the origin.



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * This class records the average end to end latency for a single message to travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
+public class EndToEndLatency {
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+    private final static long TIMEOUT = 60000;
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    // Visible for testing
+    static void execute(String... args) throws Exception {
+

Review Comment:
   Can we get rid of these extra lines at the start/end of methods? There are quite a few of them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org