You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Hangleton (via GitHub)" <gi...@apache.org> on 2023/02/07 15:53:02 UTC

[GitHub] [kafka] Hangleton opened a new pull request, #13214: [KAFKA-14577] Move the scala ConsoleProducer from core to tools module

Hangleton opened a new pull request, #13214:
URL: https://github.com/apache/kafka/pull/13214

   Code move for the `consoleProducer` as part of KAFKA-14525 - Move CLI tools from `core` to `tools` module. The copy is as identical as possible as the original class and provides iso-functionality. Unit tests are moved as well for `ConsoleProducer` and `LineMessageReader`. One minor  additional unit test for the console producer.
   
   Tests:
   - Refactored unit tests `ConsoleProducerTest` and `LineMessageReaderTest`.
   - Manual exercise of the CLI tool.
   
   ```
   Option                                   Description                            
   ------                                   -----------                            
   --batch-size <Integer: size>             Number of messages to send in a single 
                                              batch if they are not being sent     
                                              synchronously. please note that this 
                                              option will be replaced if max-      
                                              partition-memory-bytes is also set   
                                              (default: 16384)                     
   --bootstrap-server <String: server to    REQUIRED unless --broker-list          
     connect to>                              (deprecated) is specified. The server
                                              (s) to connect to. The broker list   
                                              string in the form HOST1:PORT1,HOST2:
                                              PORT2.                               
   --broker-list <String: broker-list>      DEPRECATED, use --bootstrap-server     
                                              instead; ignored if --bootstrap-     
                                              server is specified.  The broker     
                                              list string in the form HOST1:PORT1, 
                                              HOST2:PORT2.                         
   --compression-codec [String:             The compression codec: either 'none',  
     compression-codec]                       'gzip', 'snappy', 'lz4', or 'zstd'.  
                                              If specified without value, then it  
                                              defaults to 'gzip'                   
   --help                                   Print usage information.               
   --line-reader <String: reader_class>     The class name of the class to use for 
                                              reading lines from standard in. By   
                                              default each line is read as a       
                                              separate message. (default: org.     
                                              apache.kafka.tools.                  
                                              ConsoleProducer$LineMessageReader)   
   --max-block-ms <Long: max block on       The max time that the producer will    
     send>                                    block for during a send request.     
                                              (default: 60000)                     
   --max-memory-bytes <Long: total memory   The total memory used by the producer  
     in bytes>                                to buffer records waiting to be sent 
                                              to the server. This is the option to 
                                              control `buffer.memory` in producer  
                                              configs. (default: 33554432)         
   --max-partition-memory-bytes <Integer:   The buffer size allocated for a        
     memory in bytes per partition>           partition. When records are received 
                                              which are smaller than this size the 
                                              producer will attempt to             
                                              optimistically group them together   
                                              until this size is reached. This is  
                                              the option to control `batch.size`   
                                              in producer configs. (default: 16384)
   --message-send-max-retries <Integer>     Brokers can fail receiving the message 
                                              for multiple reasons, and being      
                                              unavailable transiently is just one  
                                              of them. This property specifies the 
                                              number of retries before the         
                                              producer give up and drop this       
                                              message. This is the option to       
                                              control `retries` in producer        
                                              configs. (default: 3)                
   --metadata-expiry-ms <Long: metadata     The period of time in milliseconds     
     expiration interval>                     after which we force a refresh of    
                                              metadata even if we haven't seen any 
                                              leadership changes. This is the      
                                              option to control `metadata.max.age. 
                                              ms` in producer configs. (default:   
                                              300000)                              
   --producer-property <String:             A mechanism to pass user-defined       
     producer_prop>                           properties in the form key=value to  
                                              the producer.                        
   --producer.config <String: config file>  Producer config properties file. Note  
                                              that [producer-property] takes       
                                              precedence over this config.         
   --property <String: prop>                A mechanism to pass user-defined       
                                              properties in the form key=value to  
                                              the message reader. This allows      
                                              custom configuration for a user-     
                                              defined message reader.Default       
                                              properties include:                  
                                             parse.key=false                       
                                             parse.headers=false                   
                                             ignore.error=false                    
                                             key.separator=\t                      
                                             headers.delimiter=\t                  
                                             headers.separator=,                   
                                             headers.key.separator=:               
                                             null.marker=   When set, any fields   
                                              (key, value and headers) equal to    
                                              this will be replaced by null        
                                            Default parsing pattern when:          
                                             parse.headers=true and parse.key=true:
                                              "h1:v1,h2:v2...\tkey\tvalue"         
                                             parse.key=true:                       
                                              "key\tvalue"                         
                                             parse.headers=true:                   
                                              "h1:v1,h2:v2...\tvalue"              
   --reader-config <String: config file>    Config properties file for the message 
                                              reader. Note that [property] takes   
                                              precedence over this config.         
   --request-required-acks <String:         The required `acks` of the producer    
     request required acks>                   requests (default: -1)               
   --request-timeout-ms <Integer: request   The ack timeout of the producer        
     timeout ms>                              requests. Value must be non-negative 
                                              and non-zero. (default: 1500)        
   --retry-backoff-ms <Long>                Before each retry, the producer        
                                              refreshes the metadata of relevant   
                                              topics. Since leader election takes  
                                              a bit of time, this property         
                                              specifies the amount of time that    
                                              the producer waits before refreshing 
                                              the metadata. This is the option to  
                                              control `retry.backoff.ms` in        
                                              producer configs. (default: 100)     
   --socket-buffer-size <Integer: size>     The size of the tcp RECV size. This is 
                                              the option to control `send.buffer.  
                                              bytes` in producer configs.          
                                              (default: 102400)                    
   --sync                                   If set message send requests to the    
                                              brokers are synchronously, one at a  
                                              time as they arrive.                 
   --timeout <Long: timeout_ms>             If set and the producer is running in  
                                              asynchronous mode, this gives the    
                                              maximum amount of time a message     
                                              will queue awaiting sufficient batch 
                                              size. The value is given in ms. This 
                                              is the option to control `linger.ms` 
                                              in producer configs. (default: 1000) 
   --topic <String: topic>                  REQUIRED: The topic id to produce      
                                              messages to.                         
   --version                                Display Kafka version.
   ```
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1104427399


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   Found the following impacted readers:
   
   - [ProtobufMessageReader](https://github.com/confluentinc/schema-registry/blob/master/protobuf-serializer/src/main/java/io/confluent/kafka/formatter/protobuf/ProtobufMessageReader.java)
   - [AvroMessageReader](https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/formatter/AvroMessageReader.java)
   - [JsonSchemaMessageReader](https://github.com/confluentinc/schema-registry/blob/master/json-schema-serializer/src/main/java/io/confluent/kafka/formatter/json/JsonSchemaMessageReader.java)
   
   with for parent class [SchemaMessageReader](https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/formatter/SchemaMessageReader.java).
   
   A minor reference in the documentation for the [connect-streams-pipeline](https://github.com/confluentinc/examples/blob/7.3.1-post/connect-streams-pipeline/docs/index.rst).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1103557897


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   If we want to eventually change the package name, we would need a KIP.
   
   Overall, I feel like that are a few compatibility issues with the migration of those tools so a small KIP may be good anyway to discuss how we want to address all of them together instead of discussing them in every PR. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1101716594


##########
server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java:
##########
@@ -16,17 +16,29 @@
  */
 package org.apache.kafka.server.util;
 
+import joptsimple.OptionParser;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.utils.Utils;
 
 import java.io.PrintStream;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static java.util.Arrays.stream;
+
 public class ToolsUtils {
 
+    public static void validatePortOrDie(OptionParser parser, String hostPort) {

Review Comment:
   ```suggestion
       public static void validatePortOrExit(OptionParser parser, String hostPort) {
   ```



##########
tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java:
##########
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static java.util.Arrays.stream;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.METADATA_MAX_AGE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.SEND_BUFFER_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.loadProps;
+import static org.apache.kafka.server.util.CommandLineUtils.maybeMergeOptions;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
+
+/**
+ * Sends {@link ProducerRecord} generated from lines read on the standard input.
+ */
+public class ConsoleProducer {
+    public static void main(String[] args) {
+        ConsoleProducer consoleProducer = new ConsoleProducer();
+        consoleProducer.start(args);
+    }
+
+    void start(String[] args) {
+        try {
+            ConsoleProducerConfig config = new ConsoleProducerConfig(args);
+            MessageReader reader = createMessageReader(config);
+            reader.init(System.in, config.getReaderProps());
+
+            KafkaProducer<byte[], byte[]> producer = createKafkaProducer(config.getProducerProps());
+            Exit.addShutdownHook("producer-shutdown-hook", producer::close);
+
+            ProducerRecord<byte[], byte[]> record;
+            do {
+                record = reader.readMessage();
+                if (record != null) {
+                    send(producer, record, config.sync());
+                }
+            } while (record != null);
+
+        } catch (OptionException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Exit.exit(1);
+        }
+        Exit.exit(0);
+    }
+
+    // VisibleForTesting
+    KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
+        return new KafkaProducer<>(props);
+    }
+
+    // VisibleForTesting
+    MessageReader createMessageReader(ConsoleProducerConfig config) throws ReflectiveOperationException {
+        return (MessageReader) Class.forName(config.readerClass()).getDeclaredConstructor().newInstance();
+    }
+
+    private void send(KafkaProducer<byte[], byte[]> producer, ProducerRecord<byte[], byte[]> record, boolean sync) throws Exception {
+        if (sync) {
+            producer.send(record).get();
+        } else {
+            producer.send(record, new ErrorLoggingCallback(record.topic(), record.key(), record.value(), false));
+        }
+    }
+
+    public static class ConsoleProducerConfig extends CommandDefaultOptions {
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<Void> syncOpt;
+        private final OptionSpec<String> compressionCodecOpt;
+        private final OptionSpec<Integer> batchSizeOpt;
+        private final OptionSpec<Integer> messageSendMaxRetriesOpt;
+        private final OptionSpec<Long> retryBackoffMsOpt;
+        private final OptionSpec<Long> sendTimeoutOpt;
+        private final OptionSpec<String> requestRequiredAcksOpt;
+        private final OptionSpec<Integer> requestTimeoutMsOpt;
+        private final OptionSpec<Long> metadataExpiryMsOpt;
+        private final OptionSpec<Long> maxBlockMsOpt;
+        private final OptionSpec<Long> maxMemoryBytesOpt;
+        private final OptionSpec<Integer> maxPartitionMemoryBytesOpt;
+        private final OptionSpec<String> messageReaderOpt;
+        private final OptionSpec<Integer> socketBufferSizeOpt;
+        private final OptionSpec<String> propertyOpt;
+        private final OptionSpec<String> readerConfigOpt;
+        private final OptionSpec<String> producerPropertyOpt;
+        private final OptionSpec<String> producerConfigOpt;
+
+        public ConsoleProducerConfig(String[] args) {
+            super(args);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("broker-list")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("server to connect to")
+                    .ofType(String.class);
+            syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.");
+            compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
+                            "If specified without value, then it defaults to 'gzip'")
+                    .withOptionalArg()
+                    .describedAs("compression-codec")
+                    .ofType(String.class);
+            batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously. " +
+                            "please note that this option will be replaced if max-partition-memory-bytes is also set")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, " +
+                            "and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. " +
+                            "This is the option to control `retries` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Integer.class)
+                    .defaultsTo(3);
+            retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. " +
+                            "Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. " +
+                            "This is the option to control `retry.backoff.ms` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Long.class)
+                    .defaultsTo(100L);
+            sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
+                            " a message will queue awaiting sufficient batch size. The value is given in ms. " +
+                            "This is the option to control `linger.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("timeout_ms")
+                    .ofType(Long.class)
+                    .defaultsTo(1000L);
+            requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required `acks` of the producer requests")
+                    .withRequiredArg()
+                    .describedAs("request required acks")
+                    .ofType(String.class)
+                    .defaultsTo("-1");
+            requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero.")
+                    .withRequiredArg()
+                    .describedAs("request timeout ms")
+                    .ofType(Integer.class)
+                    .defaultsTo(1500);
+            metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
+                            "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes. " +
+                            "This is the option to control `metadata.max.age.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("metadata expiration interval")
+                    .ofType(Long.class)
+                    .defaultsTo(5 * 60 * 1000L);
+            maxBlockMsOpt = parser.accepts("max-block-ms",
+                            "The max time that the producer will block for during a send request.")
+                    .withRequiredArg()
+                    .describedAs("max block on send")
+                    .ofType(Long.class)
+                    .defaultsTo(60 * 1000L);
+            maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
+                            "The total memory used by the producer to buffer records waiting to be sent to the server. " +
+                            "This is the option to control `buffer.memory` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("total memory in bytes")
+                    .ofType(Long.class)
+                    .defaultsTo(32 * 1024 * 1024L);
+            maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
+                            "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
+                            "will attempt to optimistically group them together until this size is reached. " +
+                            "This is the option to control `batch.size` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("memory in bytes per partition")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
+                            "By default each line is read as a separate message.")
+                    .withRequiredArg()
+                    .describedAs("reader_class")
+                    .ofType(String.class)
+                    .defaultsTo(LineMessageReader.class.getName());
+            socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size. " +
+                            "This is the option to control `send.buffer.bytes` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(1024 * 100);
+            propertyOpt = parser.accepts("property",
+                            "A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader." +
+                            "Default properties include:" +
+                            "\n parse.key=false" +
+                            "\n parse.headers=false" +
+                            "\n ignore.error=false" +
+                            "\n key.separator=\\t" +
+                            "\n headers.delimiter=\\t" +
+                            "\n headers.separator=," +
+                            "\n headers.key.separator=:" +
+                            "\n null.marker=   When set, any fields (key, value and headers) equal to this will be replaced by null" +
+                            "\nDefault parsing pattern when:" +
+                            "\n parse.headers=true and parse.key=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tkey\\tvalue\"" +
+                            "\n parse.key=true:" +
+                            "\n  \"key\\tvalue\"" +
+                            "\n parse.headers=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tvalue\"")
+                    .withRequiredArg()
+                    .describedAs("prop")
+                    .ofType(String.class);
+            readerConfigOpt = parser.accepts("reader-config", "Config properties file for the message reader. Note that " + propertyOpt + " takes precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ")
+                    .withRequiredArg()
+                    .describedAs("producer_prop")
+                    .ofType(String.class);
+            producerConfigOpt = parser.accepts("producer.config", "Producer config properties file. Note that " + producerPropertyOpt + " takes precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+
+            try {
+                options = parser.parse(args);
+
+            } catch (OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from standard input and publish it to Kafka.");
+            CommandLineUtils.checkRequiredArgs(parser, options, topicOpt);
+
+            ToolsUtils.validatePortOrDie(parser, brokerHostsAndPorts());
+        }
+
+        String brokerHostsAndPorts() {
+            return options.has(bootstrapServerOpt) ? options.valueOf(bootstrapServerOpt) : options.valueOf(brokerListOpt);
+        }
+
+        boolean sync() {
+            return options.has(syncOpt);
+        }
+
+        String compressionCodec() {
+            if (options.has(compressionCodecOpt)) {
+                String codecOptValue = options.valueOf(compressionCodecOpt);
+                // Defaults to gzip if no value is provided.
+                return codecOptValue == null || codecOptValue.isEmpty() ? CompressionType.GZIP.name : codecOptValue;
+            }
+
+            return CompressionType.NONE.name;
+        }
+
+        String readerClass() {
+            return options.valueOf(messageReaderOpt);
+        }
+
+        Properties getReaderProps() throws IOException {
+            Properties properties = new Properties();
+
+            if (options.has(readerConfigOpt)) {
+                properties.putAll(loadProps(options.valueOf(readerConfigOpt)));
+            }
+
+            properties.put("topic", options.valueOf(topicOpt));
+            properties.putAll(parseKeyValueArgs(options.valuesOf(propertyOpt)));
+            return properties;
+        }
+
+        Properties getProducerProps() throws IOException {
+            Properties properties = new Properties();
+
+            if (options.has(producerConfigOpt)) {
+                properties.putAll(loadProps(options.valueOf(producerConfigOpt)));
+            }
+
+            properties.putAll(parseKeyValueArgs(options.valuesOf(producerPropertyOpt)));
+            properties.put(BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts());
+            properties.put(COMPRESSION_TYPE_CONFIG, compressionCodec());
+            properties.putIfAbsent(CLIENT_ID_CONFIG, "console-producer");
+            properties.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+            properties.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+            maybeMergeOptions(properties, LINGER_MS_CONFIG, options, sendTimeoutOpt);
+            maybeMergeOptions(properties, ACKS_CONFIG, options, requestRequiredAcksOpt);
+            maybeMergeOptions(properties, REQUEST_TIMEOUT_MS_CONFIG, options, requestTimeoutMsOpt);
+            maybeMergeOptions(properties, RETRIES_CONFIG, options, messageSendMaxRetriesOpt);
+            maybeMergeOptions(properties, RETRY_BACKOFF_MS_CONFIG, options, retryBackoffMsOpt);
+            maybeMergeOptions(properties, SEND_BUFFER_CONFIG, options, socketBufferSizeOpt);
+            maybeMergeOptions(properties, BUFFER_MEMORY_CONFIG, options, maxMemoryBytesOpt);
+            // We currently have 2 options to set the batch.size value. We'll deprecate/remove one of them in KIP-717.
+            maybeMergeOptions(properties, BATCH_SIZE_CONFIG, options, batchSizeOpt);
+            maybeMergeOptions(properties, BATCH_SIZE_CONFIG, options, maxPartitionMemoryBytesOpt);
+            maybeMergeOptions(properties, METADATA_MAX_AGE_CONFIG, options, metadataExpiryMsOpt);
+            maybeMergeOptions(properties, MAX_BLOCK_MS_CONFIG, options, maxBlockMsOpt);
+
+            return properties;
+        }
+    }
+
+    static final class LineMessageReader implements MessageReader {

Review Comment:
   Given that there is no hard dependency with the parent class and there is a dedicated test, I would suggest to put LineMessageReader in a separate file. Also note that the NPath complexity lives here, so this would be the class to suppress in checkstyle.



##########
tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java:
##########
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static java.util.Arrays.stream;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.METADATA_MAX_AGE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.SEND_BUFFER_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.loadProps;
+import static org.apache.kafka.server.util.CommandLineUtils.maybeMergeOptions;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
+
+/**
+ * Sends {@link ProducerRecord} generated from lines read on the standard input.
+ */
+public class ConsoleProducer {
+    public static void main(String[] args) {
+        ConsoleProducer consoleProducer = new ConsoleProducer();
+        consoleProducer.start(args);
+    }
+
+    void start(String[] args) {
+        try {
+            ConsoleProducerConfig config = new ConsoleProducerConfig(args);
+            MessageReader reader = createMessageReader(config);
+            reader.init(System.in, config.getReaderProps());
+
+            KafkaProducer<byte[], byte[]> producer = createKafkaProducer(config.getProducerProps());
+            Exit.addShutdownHook("producer-shutdown-hook", producer::close);
+
+            ProducerRecord<byte[], byte[]> record;
+            do {
+                record = reader.readMessage();
+                if (record != null) {
+                    send(producer, record, config.sync());
+                }
+            } while (record != null);
+
+        } catch (OptionException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Exit.exit(1);
+        }
+        Exit.exit(0);
+    }
+
+    // VisibleForTesting
+    KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
+        return new KafkaProducer<>(props);
+    }
+
+    // VisibleForTesting
+    MessageReader createMessageReader(ConsoleProducerConfig config) throws ReflectiveOperationException {
+        return (MessageReader) Class.forName(config.readerClass()).getDeclaredConstructor().newInstance();
+    }
+
+    private void send(KafkaProducer<byte[], byte[]> producer, ProducerRecord<byte[], byte[]> record, boolean sync) throws Exception {
+        if (sync) {
+            producer.send(record).get();
+        } else {
+            producer.send(record, new ErrorLoggingCallback(record.topic(), record.key(), record.value(), false));
+        }
+    }
+
+    public static class ConsoleProducerConfig extends CommandDefaultOptions {
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<Void> syncOpt;
+        private final OptionSpec<String> compressionCodecOpt;
+        private final OptionSpec<Integer> batchSizeOpt;
+        private final OptionSpec<Integer> messageSendMaxRetriesOpt;
+        private final OptionSpec<Long> retryBackoffMsOpt;
+        private final OptionSpec<Long> sendTimeoutOpt;
+        private final OptionSpec<String> requestRequiredAcksOpt;
+        private final OptionSpec<Integer> requestTimeoutMsOpt;
+        private final OptionSpec<Long> metadataExpiryMsOpt;
+        private final OptionSpec<Long> maxBlockMsOpt;
+        private final OptionSpec<Long> maxMemoryBytesOpt;
+        private final OptionSpec<Integer> maxPartitionMemoryBytesOpt;
+        private final OptionSpec<String> messageReaderOpt;
+        private final OptionSpec<Integer> socketBufferSizeOpt;
+        private final OptionSpec<String> propertyOpt;
+        private final OptionSpec<String> readerConfigOpt;
+        private final OptionSpec<String> producerPropertyOpt;
+        private final OptionSpec<String> producerConfigOpt;
+
+        public ConsoleProducerConfig(String[] args) {
+            super(args);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("broker-list")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("server to connect to")
+                    .ofType(String.class);
+            syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.");
+            compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
+                            "If specified without value, then it defaults to 'gzip'")
+                    .withOptionalArg()
+                    .describedAs("compression-codec")
+                    .ofType(String.class);
+            batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously. " +
+                            "please note that this option will be replaced if max-partition-memory-bytes is also set")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, " +
+                            "and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. " +
+                            "This is the option to control `retries` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Integer.class)
+                    .defaultsTo(3);
+            retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. " +
+                            "Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. " +
+                            "This is the option to control `retry.backoff.ms` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Long.class)
+                    .defaultsTo(100L);
+            sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
+                            " a message will queue awaiting sufficient batch size. The value is given in ms. " +
+                            "This is the option to control `linger.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("timeout_ms")
+                    .ofType(Long.class)
+                    .defaultsTo(1000L);
+            requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required `acks` of the producer requests")
+                    .withRequiredArg()
+                    .describedAs("request required acks")
+                    .ofType(String.class)
+                    .defaultsTo("-1");
+            requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero.")
+                    .withRequiredArg()
+                    .describedAs("request timeout ms")
+                    .ofType(Integer.class)
+                    .defaultsTo(1500);
+            metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
+                            "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes. " +
+                            "This is the option to control `metadata.max.age.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("metadata expiration interval")
+                    .ofType(Long.class)
+                    .defaultsTo(5 * 60 * 1000L);
+            maxBlockMsOpt = parser.accepts("max-block-ms",
+                            "The max time that the producer will block for during a send request.")
+                    .withRequiredArg()
+                    .describedAs("max block on send")
+                    .ofType(Long.class)
+                    .defaultsTo(60 * 1000L);
+            maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
+                            "The total memory used by the producer to buffer records waiting to be sent to the server. " +
+                            "This is the option to control `buffer.memory` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("total memory in bytes")
+                    .ofType(Long.class)
+                    .defaultsTo(32 * 1024 * 1024L);
+            maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
+                            "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
+                            "will attempt to optimistically group them together until this size is reached. " +
+                            "This is the option to control `batch.size` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("memory in bytes per partition")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
+                            "By default each line is read as a separate message.")
+                    .withRequiredArg()
+                    .describedAs("reader_class")
+                    .ofType(String.class)
+                    .defaultsTo(LineMessageReader.class.getName());
+            socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size. " +
+                            "This is the option to control `send.buffer.bytes` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(1024 * 100);
+            propertyOpt = parser.accepts("property",
+                            "A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader." +
+                            "Default properties include:" +
+                            "\n parse.key=false" +
+                            "\n parse.headers=false" +
+                            "\n ignore.error=false" +
+                            "\n key.separator=\\t" +
+                            "\n headers.delimiter=\\t" +
+                            "\n headers.separator=," +
+                            "\n headers.key.separator=:" +
+                            "\n null.marker=   When set, any fields (key, value and headers) equal to this will be replaced by null" +
+                            "\nDefault parsing pattern when:" +
+                            "\n parse.headers=true and parse.key=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tkey\\tvalue\"" +
+                            "\n parse.key=true:" +
+                            "\n  \"key\\tvalue\"" +
+                            "\n parse.headers=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tvalue\"")
+                    .withRequiredArg()
+                    .describedAs("prop")
+                    .ofType(String.class);
+            readerConfigOpt = parser.accepts("reader-config", "Config properties file for the message reader. Note that " + propertyOpt + " takes precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ")

Review Comment:
   I think we can get rid of the final extra space.



##########
tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java:
##########
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static java.util.Arrays.stream;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.METADATA_MAX_AGE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.SEND_BUFFER_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.loadProps;
+import static org.apache.kafka.server.util.CommandLineUtils.maybeMergeOptions;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
+
+/**
+ * Sends {@link ProducerRecord} generated from lines read on the standard input.
+ */
+public class ConsoleProducer {
+    public static void main(String[] args) {
+        ConsoleProducer consoleProducer = new ConsoleProducer();
+        consoleProducer.start(args);
+    }
+
+    void start(String[] args) {
+        try {
+            ConsoleProducerConfig config = new ConsoleProducerConfig(args);
+            MessageReader reader = createMessageReader(config);
+            reader.init(System.in, config.getReaderProps());
+
+            KafkaProducer<byte[], byte[]> producer = createKafkaProducer(config.getProducerProps());
+            Exit.addShutdownHook("producer-shutdown-hook", producer::close);
+
+            ProducerRecord<byte[], byte[]> record;
+            do {
+                record = reader.readMessage();
+                if (record != null) {
+                    send(producer, record, config.sync());
+                }
+            } while (record != null);
+
+        } catch (OptionException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Exit.exit(1);
+        }
+        Exit.exit(0);
+    }
+
+    // VisibleForTesting
+    KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
+        return new KafkaProducer<>(props);
+    }
+
+    // VisibleForTesting
+    MessageReader createMessageReader(ConsoleProducerConfig config) throws ReflectiveOperationException {
+        return (MessageReader) Class.forName(config.readerClass()).getDeclaredConstructor().newInstance();
+    }
+
+    private void send(KafkaProducer<byte[], byte[]> producer, ProducerRecord<byte[], byte[]> record, boolean sync) throws Exception {
+        if (sync) {
+            producer.send(record).get();
+        } else {
+            producer.send(record, new ErrorLoggingCallback(record.topic(), record.key(), record.value(), false));
+        }
+    }
+
+    public static class ConsoleProducerConfig extends CommandDefaultOptions {
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<Void> syncOpt;
+        private final OptionSpec<String> compressionCodecOpt;
+        private final OptionSpec<Integer> batchSizeOpt;
+        private final OptionSpec<Integer> messageSendMaxRetriesOpt;
+        private final OptionSpec<Long> retryBackoffMsOpt;
+        private final OptionSpec<Long> sendTimeoutOpt;
+        private final OptionSpec<String> requestRequiredAcksOpt;
+        private final OptionSpec<Integer> requestTimeoutMsOpt;
+        private final OptionSpec<Long> metadataExpiryMsOpt;
+        private final OptionSpec<Long> maxBlockMsOpt;
+        private final OptionSpec<Long> maxMemoryBytesOpt;
+        private final OptionSpec<Integer> maxPartitionMemoryBytesOpt;
+        private final OptionSpec<String> messageReaderOpt;
+        private final OptionSpec<Integer> socketBufferSizeOpt;
+        private final OptionSpec<String> propertyOpt;
+        private final OptionSpec<String> readerConfigOpt;
+        private final OptionSpec<String> producerPropertyOpt;
+        private final OptionSpec<String> producerConfigOpt;
+
+        public ConsoleProducerConfig(String[] args) {
+            super(args);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("broker-list")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("server to connect to")
+                    .ofType(String.class);
+            syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.");
+            compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
+                            "If specified without value, then it defaults to 'gzip'")
+                    .withOptionalArg()
+                    .describedAs("compression-codec")
+                    .ofType(String.class);
+            batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously. " +
+                            "please note that this option will be replaced if max-partition-memory-bytes is also set")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, " +
+                            "and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. " +
+                            "This is the option to control `retries` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Integer.class)
+                    .defaultsTo(3);
+            retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. " +
+                            "Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. " +
+                            "This is the option to control `retry.backoff.ms` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Long.class)
+                    .defaultsTo(100L);
+            sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
+                            " a message will queue awaiting sufficient batch size. The value is given in ms. " +
+                            "This is the option to control `linger.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("timeout_ms")
+                    .ofType(Long.class)
+                    .defaultsTo(1000L);
+            requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required `acks` of the producer requests")
+                    .withRequiredArg()
+                    .describedAs("request required acks")
+                    .ofType(String.class)
+                    .defaultsTo("-1");
+            requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero.")
+                    .withRequiredArg()
+                    .describedAs("request timeout ms")
+                    .ofType(Integer.class)
+                    .defaultsTo(1500);
+            metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
+                            "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes. " +
+                            "This is the option to control `metadata.max.age.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("metadata expiration interval")
+                    .ofType(Long.class)
+                    .defaultsTo(5 * 60 * 1000L);
+            maxBlockMsOpt = parser.accepts("max-block-ms",
+                            "The max time that the producer will block for during a send request.")
+                    .withRequiredArg()
+                    .describedAs("max block on send")
+                    .ofType(Long.class)
+                    .defaultsTo(60 * 1000L);
+            maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
+                            "The total memory used by the producer to buffer records waiting to be sent to the server. " +
+                            "This is the option to control `buffer.memory` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("total memory in bytes")
+                    .ofType(Long.class)
+                    .defaultsTo(32 * 1024 * 1024L);
+            maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
+                            "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
+                            "will attempt to optimistically group them together until this size is reached. " +
+                            "This is the option to control `batch.size` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("memory in bytes per partition")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
+                            "By default each line is read as a separate message.")
+                    .withRequiredArg()
+                    .describedAs("reader_class")
+                    .ofType(String.class)
+                    .defaultsTo(LineMessageReader.class.getName());
+            socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size. " +
+                            "This is the option to control `send.buffer.bytes` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(1024 * 100);
+            propertyOpt = parser.accepts("property",
+                            "A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader." +
+                            "Default properties include:" +
+                            "\n parse.key=false" +
+                            "\n parse.headers=false" +
+                            "\n ignore.error=false" +
+                            "\n key.separator=\\t" +
+                            "\n headers.delimiter=\\t" +
+                            "\n headers.separator=," +
+                            "\n headers.key.separator=:" +
+                            "\n null.marker=   When set, any fields (key, value and headers) equal to this will be replaced by null" +
+                            "\nDefault parsing pattern when:" +
+                            "\n parse.headers=true and parse.key=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tkey\\tvalue\"" +
+                            "\n parse.key=true:" +
+                            "\n  \"key\\tvalue\"" +
+                            "\n parse.headers=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tvalue\"")
+                    .withRequiredArg()
+                    .describedAs("prop")
+                    .ofType(String.class);
+            readerConfigOpt = parser.accepts("reader-config", "Config properties file for the message reader. Note that " + propertyOpt + " takes precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ")
+                    .withRequiredArg()
+                    .describedAs("producer_prop")
+                    .ofType(String.class);
+            producerConfigOpt = parser.accepts("producer.config", "Producer config properties file. Note that " + producerPropertyOpt + " takes precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+
+            try {
+                options = parser.parse(args);
+
+            } catch (OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from standard input and publish it to Kafka.");
+            CommandLineUtils.checkRequiredArgs(parser, options, topicOpt);
+
+            ToolsUtils.validatePortOrDie(parser, brokerHostsAndPorts());

Review Comment:
   ```suggestion
               ToolsUtils.validatePortOrExit(parser, brokerHostsAndPorts());
   ```



##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   This would be a breaking change, because there is an option that allows to provide your own message reader implementation.



##########
tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java:
##########
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static java.util.Arrays.stream;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.METADATA_MAX_AGE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.SEND_BUFFER_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.loadProps;
+import static org.apache.kafka.server.util.CommandLineUtils.maybeMergeOptions;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
+
+/**
+ * Sends {@link ProducerRecord} generated from lines read on the standard input.
+ */
+public class ConsoleProducer {
+    public static void main(String[] args) {
+        ConsoleProducer consoleProducer = new ConsoleProducer();
+        consoleProducer.start(args);
+    }
+
+    void start(String[] args) {
+        try {
+            ConsoleProducerConfig config = new ConsoleProducerConfig(args);
+            MessageReader reader = createMessageReader(config);
+            reader.init(System.in, config.getReaderProps());
+
+            KafkaProducer<byte[], byte[]> producer = createKafkaProducer(config.getProducerProps());
+            Exit.addShutdownHook("producer-shutdown-hook", producer::close);
+
+            ProducerRecord<byte[], byte[]> record;
+            do {
+                record = reader.readMessage();
+                if (record != null) {
+                    send(producer, record, config.sync());
+                }
+            } while (record != null);
+
+        } catch (OptionException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Exit.exit(1);
+        }
+        Exit.exit(0);
+    }
+
+    // VisibleForTesting
+    KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
+        return new KafkaProducer<>(props);
+    }
+
+    // VisibleForTesting
+    MessageReader createMessageReader(ConsoleProducerConfig config) throws ReflectiveOperationException {
+        return (MessageReader) Class.forName(config.readerClass()).getDeclaredConstructor().newInstance();
+    }
+
+    private void send(KafkaProducer<byte[], byte[]> producer, ProducerRecord<byte[], byte[]> record, boolean sync) throws Exception {
+        if (sync) {
+            producer.send(record).get();
+        } else {
+            producer.send(record, new ErrorLoggingCallback(record.topic(), record.key(), record.value(), false));
+        }
+    }
+
+    public static class ConsoleProducerConfig extends CommandDefaultOptions {
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<Void> syncOpt;
+        private final OptionSpec<String> compressionCodecOpt;
+        private final OptionSpec<Integer> batchSizeOpt;
+        private final OptionSpec<Integer> messageSendMaxRetriesOpt;
+        private final OptionSpec<Long> retryBackoffMsOpt;
+        private final OptionSpec<Long> sendTimeoutOpt;
+        private final OptionSpec<String> requestRequiredAcksOpt;
+        private final OptionSpec<Integer> requestTimeoutMsOpt;
+        private final OptionSpec<Long> metadataExpiryMsOpt;
+        private final OptionSpec<Long> maxBlockMsOpt;
+        private final OptionSpec<Long> maxMemoryBytesOpt;
+        private final OptionSpec<Integer> maxPartitionMemoryBytesOpt;
+        private final OptionSpec<String> messageReaderOpt;
+        private final OptionSpec<Integer> socketBufferSizeOpt;
+        private final OptionSpec<String> propertyOpt;
+        private final OptionSpec<String> readerConfigOpt;
+        private final OptionSpec<String> producerPropertyOpt;
+        private final OptionSpec<String> producerConfigOpt;
+
+        public ConsoleProducerConfig(String[] args) {
+            super(args);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("broker-list")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("server to connect to")
+                    .ofType(String.class);
+            syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.");
+            compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
+                            "If specified without value, then it defaults to 'gzip'")
+                    .withOptionalArg()
+                    .describedAs("compression-codec")
+                    .ofType(String.class);
+            batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously. " +
+                            "please note that this option will be replaced if max-partition-memory-bytes is also set")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, " +
+                            "and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. " +
+                            "This is the option to control `retries` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Integer.class)
+                    .defaultsTo(3);
+            retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. " +
+                            "Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. " +
+                            "This is the option to control `retry.backoff.ms` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Long.class)
+                    .defaultsTo(100L);
+            sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
+                            " a message will queue awaiting sufficient batch size. The value is given in ms. " +
+                            "This is the option to control `linger.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("timeout_ms")
+                    .ofType(Long.class)
+                    .defaultsTo(1000L);
+            requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required `acks` of the producer requests")
+                    .withRequiredArg()
+                    .describedAs("request required acks")
+                    .ofType(String.class)
+                    .defaultsTo("-1");
+            requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero.")
+                    .withRequiredArg()
+                    .describedAs("request timeout ms")
+                    .ofType(Integer.class)
+                    .defaultsTo(1500);
+            metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
+                            "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes. " +
+                            "This is the option to control `metadata.max.age.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("metadata expiration interval")
+                    .ofType(Long.class)
+                    .defaultsTo(5 * 60 * 1000L);
+            maxBlockMsOpt = parser.accepts("max-block-ms",
+                            "The max time that the producer will block for during a send request.")
+                    .withRequiredArg()
+                    .describedAs("max block on send")
+                    .ofType(Long.class)
+                    .defaultsTo(60 * 1000L);
+            maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
+                            "The total memory used by the producer to buffer records waiting to be sent to the server. " +
+                            "This is the option to control `buffer.memory` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("total memory in bytes")
+                    .ofType(Long.class)
+                    .defaultsTo(32 * 1024 * 1024L);
+            maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
+                            "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
+                            "will attempt to optimistically group them together until this size is reached. " +
+                            "This is the option to control `batch.size` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("memory in bytes per partition")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
+                            "By default each line is read as a separate message.")
+                    .withRequiredArg()
+                    .describedAs("reader_class")
+                    .ofType(String.class)
+                    .defaultsTo(LineMessageReader.class.getName());
+            socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size. " +
+                            "This is the option to control `send.buffer.bytes` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(1024 * 100);
+            propertyOpt = parser.accepts("property",
+                            "A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader." +
+                            "Default properties include:" +
+                            "\n parse.key=false" +
+                            "\n parse.headers=false" +
+                            "\n ignore.error=false" +
+                            "\n key.separator=\\t" +
+                            "\n headers.delimiter=\\t" +
+                            "\n headers.separator=," +
+                            "\n headers.key.separator=:" +
+                            "\n null.marker=   When set, any fields (key, value and headers) equal to this will be replaced by null" +
+                            "\nDefault parsing pattern when:" +
+                            "\n parse.headers=true and parse.key=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tkey\\tvalue\"" +
+                            "\n parse.key=true:" +
+                            "\n  \"key\\tvalue\"" +
+                            "\n parse.headers=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tvalue\"")
+                    .withRequiredArg()
+                    .describedAs("prop")
+                    .ofType(String.class);
+            readerConfigOpt = parser.accepts("reader-config", "Config properties file for the message reader. Note that " + propertyOpt + " takes precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ")
+                    .withRequiredArg()
+                    .describedAs("producer_prop")
+                    .ofType(String.class);
+            producerConfigOpt = parser.accepts("producer.config", "Producer config properties file. Note that " + producerPropertyOpt + " takes precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+
+            try {
+                options = parser.parse(args);
+
+            } catch (OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from standard input and publish it to Kafka.");
+            CommandLineUtils.checkRequiredArgs(parser, options, topicOpt);
+
+            ToolsUtils.validatePortOrDie(parser, brokerHostsAndPorts());
+        }
+
+        String brokerHostsAndPorts() {
+            return options.has(bootstrapServerOpt) ? options.valueOf(bootstrapServerOpt) : options.valueOf(brokerListOpt);
+        }
+
+        boolean sync() {
+            return options.has(syncOpt);
+        }
+
+        String compressionCodec() {
+            if (options.has(compressionCodecOpt)) {
+                String codecOptValue = options.valueOf(compressionCodecOpt);
+                // Defaults to gzip if no value is provided.
+                return codecOptValue == null || codecOptValue.isEmpty() ? CompressionType.GZIP.name : codecOptValue;
+            }
+
+            return CompressionType.NONE.name;
+        }
+
+        String readerClass() {
+            return options.valueOf(messageReaderOpt);
+        }
+
+        Properties getReaderProps() throws IOException {
+            Properties properties = new Properties();
+
+            if (options.has(readerConfigOpt)) {
+                properties.putAll(loadProps(options.valueOf(readerConfigOpt)));
+            }
+
+            properties.put("topic", options.valueOf(topicOpt));
+            properties.putAll(parseKeyValueArgs(options.valuesOf(propertyOpt)));
+            return properties;
+        }
+
+        Properties getProducerProps() throws IOException {
+            Properties properties = new Properties();
+
+            if (options.has(producerConfigOpt)) {
+                properties.putAll(loadProps(options.valueOf(producerConfigOpt)));
+            }
+
+            properties.putAll(parseKeyValueArgs(options.valuesOf(producerPropertyOpt)));
+            properties.put(BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts());
+            properties.put(COMPRESSION_TYPE_CONFIG, compressionCodec());
+            properties.putIfAbsent(CLIENT_ID_CONFIG, "console-producer");
+            properties.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+            properties.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+            maybeMergeOptions(properties, LINGER_MS_CONFIG, options, sendTimeoutOpt);
+            maybeMergeOptions(properties, ACKS_CONFIG, options, requestRequiredAcksOpt);
+            maybeMergeOptions(properties, REQUEST_TIMEOUT_MS_CONFIG, options, requestTimeoutMsOpt);
+            maybeMergeOptions(properties, RETRIES_CONFIG, options, messageSendMaxRetriesOpt);
+            maybeMergeOptions(properties, RETRY_BACKOFF_MS_CONFIG, options, retryBackoffMsOpt);
+            maybeMergeOptions(properties, SEND_BUFFER_CONFIG, options, socketBufferSizeOpt);
+            maybeMergeOptions(properties, BUFFER_MEMORY_CONFIG, options, maxMemoryBytesOpt);
+            // We currently have 2 options to set the batch.size value. We'll deprecate/remove one of them in KIP-717.
+            maybeMergeOptions(properties, BATCH_SIZE_CONFIG, options, batchSizeOpt);
+            maybeMergeOptions(properties, BATCH_SIZE_CONFIG, options, maxPartitionMemoryBytesOpt);
+            maybeMergeOptions(properties, METADATA_MAX_AGE_CONFIG, options, metadataExpiryMsOpt);
+            maybeMergeOptions(properties, MAX_BLOCK_MS_CONFIG, options, maxBlockMsOpt);
+
+            return properties;
+        }
+    }
+
+    static final class LineMessageReader implements MessageReader {
+        private String topic;
+        private BufferedReader reader;
+        private boolean parseKey;
+        private String keySeparator = "\t";
+        private boolean parseHeaders;
+        private String headersDelimiter = "\t";
+        private String headersSeparator = ",";
+        private String headersKeySeparator = ":";
+        private boolean ignoreError;
+        private int lineNumber;
+        private boolean printPrompt = System.console() != null;
+        private Pattern headersSeparatorPattern;
+        private String nullMarker;
+
+        @Override
+        public void init(InputStream inputStream, Properties props) {
+            topic = props.getProperty("topic");
+            if (props.containsKey("parse.key"))
+                parseKey = props.getProperty("parse.key").trim().equalsIgnoreCase("true");
+            if (props.containsKey("key.separator"))
+                keySeparator = props.getProperty("key.separator");
+            if (props.containsKey("parse.headers"))
+                parseHeaders = props.getProperty("parse.headers").trim().equalsIgnoreCase("true");
+            if (props.containsKey("headers.delimiter"))
+                headersDelimiter = props.getProperty("headers.delimiter");
+            if (props.containsKey("headers.separator"))
+                headersSeparator = props.getProperty("headers.separator");
+            headersSeparatorPattern = Pattern.compile(headersSeparator);
+            if (props.containsKey("headers.key.separator"))
+                headersKeySeparator = props.getProperty("headers.key.separator");
+            if (props.containsKey("ignore.error"))
+                ignoreError = props.getProperty("ignore.error").trim().equalsIgnoreCase("true");
+            if (headersDelimiter.equals(headersSeparator))
+                throw new KafkaException("headers.delimiter and headers.separator may not be equal");
+            if (headersDelimiter.equals(headersKeySeparator))
+                throw new KafkaException("headers.delimiter and headers.key.separator may not be equal");
+            if (headersSeparator.equals(headersKeySeparator))
+                throw new KafkaException("headers.separator and headers.key.separator may not be equal");
+            if (props.containsKey("null.marker"))
+                nullMarker = props.getProperty("null.marker");
+            if (keySeparator.equals(nullMarker))
+                throw new KafkaException("null.marker and key.separator may not be equal");
+            if (headersSeparator.equals(nullMarker))
+                throw new KafkaException("null.marker and headers.separator may not be equal");
+            if (headersDelimiter.equals(nullMarker))
+                throw new KafkaException("null.marker and headers.delimiter may not be equal");
+            if (headersKeySeparator.equals(nullMarker))
+                throw new KafkaException("null.marker and headers.key.separator may not be equal");
+
+            reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
+        }
+
+        @Override
+        public ProducerRecord<byte[], byte[]> readMessage() {
+            ++lineNumber;
+            if (printPrompt) {
+                System.out.print(">");
+            }
+
+            String line;
+            try {
+                line = reader.readLine();
+
+            } catch (IOException e) {
+                throw new KafkaException(e);
+            }
+
+            if (line == null) {
+                return null;
+            }
+
+            String headers = parse(parseHeaders, line, 0, headersDelimiter, "headers delimiter");
+            int headerOffset = headers == null ? 0 : headers.length() + headersDelimiter.length();
+
+            String key = parse(parseKey, line, headerOffset, keySeparator, "key separator");
+            int keyOffset = key == null ? 0 : key.length() + keySeparator.length();
+
+            String value = line.substring(headerOffset + keyOffset);
+
+            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                    topic,
+                    key != null && !key.equals(nullMarker) ? key.getBytes(StandardCharsets.UTF_8) : null,
+                    value != null && !value.equals(nullMarker) ? value.getBytes(StandardCharsets.UTF_8) : null
+            );
+
+            if (headers != null && !headers.equals(nullMarker)) {
+                stream(splitHeaders(headers)).forEach(header -> record.headers().add(header.key, header.value));
+            }
+
+            return record;
+        }
+
+        private String parse(boolean enabled, String line, int startIndex, String demarcation, String demarcationName) {
+            if (!enabled) {
+                return null;
+            }
+            int index = line.indexOf(demarcation, startIndex);
+            if (index == -1) {
+                if (ignoreError) {
+                    return null;
+                }
+                throw new KafkaException("No " + demarcationName + " found on line number " + lineNumber + ": '" + line + "'");
+            }
+            return line.substring(startIndex, index);
+        }
+
+        private Header[] splitHeaders(String headers) {
+            return stream(headersSeparatorPattern.split(headers))
+                    .map(pair -> {
+                        int i = pair.indexOf(headersKeySeparator);
+                        if (i == -1) {
+                            if (ignoreError) {
+                                return new Header(pair, null);
+                            }
+                            throw new KafkaException("No header key separator found in pair '" + pair + "' on line number " + lineNumber);
+                        }
+
+                        String headerKey = pair.substring(0, i);
+                        if (headerKey.equals(nullMarker)) {
+                            throw new KafkaException("Header keys should not be equal to the null marker '" + nullMarker + "' as they can't be null");
+                        }
+
+                        String value = pair.substring(i + headersKeySeparator.length());
+                        byte[] headerValue = value.equals(nullMarker) ? null : value.getBytes(StandardCharsets.UTF_8);
+                        return new Header(headerKey, headerValue);
+
+                    }).toArray(Header[]::new);
+        }
+
+        // VisibleForTesting
+        String keySeparator() {
+            return keySeparator;
+        }
+
+        // VisibleForTesting
+        boolean parseKey() {
+            return parseKey;
+        }
+
+        // VisibleForTesting
+        boolean parseHeaders() {
+            return parseHeaders;
+        }
+    }
+
+    // VisibleForTesting
+    static class Header {

Review Comment:
   Why not using Header and RecordHeader?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1101958375


##########
tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java:
##########
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static java.util.Arrays.stream;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.METADATA_MAX_AGE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.SEND_BUFFER_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.loadProps;
+import static org.apache.kafka.server.util.CommandLineUtils.maybeMergeOptions;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
+
+/**
+ * Sends {@link ProducerRecord} generated from lines read on the standard input.
+ */
+public class ConsoleProducer {
+    public static void main(String[] args) {
+        ConsoleProducer consoleProducer = new ConsoleProducer();
+        consoleProducer.start(args);
+    }
+
+    void start(String[] args) {
+        try {
+            ConsoleProducerConfig config = new ConsoleProducerConfig(args);
+            MessageReader reader = createMessageReader(config);
+            reader.init(System.in, config.getReaderProps());
+
+            KafkaProducer<byte[], byte[]> producer = createKafkaProducer(config.getProducerProps());
+            Exit.addShutdownHook("producer-shutdown-hook", producer::close);
+
+            ProducerRecord<byte[], byte[]> record;
+            do {
+                record = reader.readMessage();
+                if (record != null) {
+                    send(producer, record, config.sync());
+                }
+            } while (record != null);
+
+        } catch (OptionException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Exit.exit(1);
+        }
+        Exit.exit(0);
+    }
+
+    // VisibleForTesting
+    KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
+        return new KafkaProducer<>(props);
+    }
+
+    // VisibleForTesting
+    MessageReader createMessageReader(ConsoleProducerConfig config) throws ReflectiveOperationException {
+        return (MessageReader) Class.forName(config.readerClass()).getDeclaredConstructor().newInstance();
+    }
+
+    private void send(KafkaProducer<byte[], byte[]> producer, ProducerRecord<byte[], byte[]> record, boolean sync) throws Exception {
+        if (sync) {
+            producer.send(record).get();
+        } else {
+            producer.send(record, new ErrorLoggingCallback(record.topic(), record.key(), record.value(), false));
+        }
+    }
+
+    public static class ConsoleProducerConfig extends CommandDefaultOptions {
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<Void> syncOpt;
+        private final OptionSpec<String> compressionCodecOpt;
+        private final OptionSpec<Integer> batchSizeOpt;
+        private final OptionSpec<Integer> messageSendMaxRetriesOpt;
+        private final OptionSpec<Long> retryBackoffMsOpt;
+        private final OptionSpec<Long> sendTimeoutOpt;
+        private final OptionSpec<String> requestRequiredAcksOpt;
+        private final OptionSpec<Integer> requestTimeoutMsOpt;
+        private final OptionSpec<Long> metadataExpiryMsOpt;
+        private final OptionSpec<Long> maxBlockMsOpt;
+        private final OptionSpec<Long> maxMemoryBytesOpt;
+        private final OptionSpec<Integer> maxPartitionMemoryBytesOpt;
+        private final OptionSpec<String> messageReaderOpt;
+        private final OptionSpec<Integer> socketBufferSizeOpt;
+        private final OptionSpec<String> propertyOpt;
+        private final OptionSpec<String> readerConfigOpt;
+        private final OptionSpec<String> producerPropertyOpt;
+        private final OptionSpec<String> producerConfigOpt;
+
+        public ConsoleProducerConfig(String[] args) {
+            super(args);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("broker-list")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("server to connect to")
+                    .ofType(String.class);
+            syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.");
+            compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
+                            "If specified without value, then it defaults to 'gzip'")
+                    .withOptionalArg()
+                    .describedAs("compression-codec")
+                    .ofType(String.class);
+            batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously. " +
+                            "please note that this option will be replaced if max-partition-memory-bytes is also set")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, " +
+                            "and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. " +
+                            "This is the option to control `retries` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Integer.class)
+                    .defaultsTo(3);
+            retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. " +
+                            "Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. " +
+                            "This is the option to control `retry.backoff.ms` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Long.class)
+                    .defaultsTo(100L);
+            sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
+                            " a message will queue awaiting sufficient batch size. The value is given in ms. " +
+                            "This is the option to control `linger.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("timeout_ms")
+                    .ofType(Long.class)
+                    .defaultsTo(1000L);
+            requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required `acks` of the producer requests")
+                    .withRequiredArg()
+                    .describedAs("request required acks")
+                    .ofType(String.class)
+                    .defaultsTo("-1");
+            requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero.")
+                    .withRequiredArg()
+                    .describedAs("request timeout ms")
+                    .ofType(Integer.class)
+                    .defaultsTo(1500);
+            metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
+                            "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes. " +
+                            "This is the option to control `metadata.max.age.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("metadata expiration interval")
+                    .ofType(Long.class)
+                    .defaultsTo(5 * 60 * 1000L);
+            maxBlockMsOpt = parser.accepts("max-block-ms",
+                            "The max time that the producer will block for during a send request.")
+                    .withRequiredArg()
+                    .describedAs("max block on send")
+                    .ofType(Long.class)
+                    .defaultsTo(60 * 1000L);
+            maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
+                            "The total memory used by the producer to buffer records waiting to be sent to the server. " +
+                            "This is the option to control `buffer.memory` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("total memory in bytes")
+                    .ofType(Long.class)
+                    .defaultsTo(32 * 1024 * 1024L);
+            maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
+                            "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
+                            "will attempt to optimistically group them together until this size is reached. " +
+                            "This is the option to control `batch.size` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("memory in bytes per partition")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
+                            "By default each line is read as a separate message.")
+                    .withRequiredArg()
+                    .describedAs("reader_class")
+                    .ofType(String.class)
+                    .defaultsTo(LineMessageReader.class.getName());
+            socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size. " +
+                            "This is the option to control `send.buffer.bytes` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(1024 * 100);
+            propertyOpt = parser.accepts("property",
+                            "A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader." +
+                            "Default properties include:" +
+                            "\n parse.key=false" +
+                            "\n parse.headers=false" +
+                            "\n ignore.error=false" +
+                            "\n key.separator=\\t" +
+                            "\n headers.delimiter=\\t" +
+                            "\n headers.separator=," +
+                            "\n headers.key.separator=:" +
+                            "\n null.marker=   When set, any fields (key, value and headers) equal to this will be replaced by null" +
+                            "\nDefault parsing pattern when:" +
+                            "\n parse.headers=true and parse.key=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tkey\\tvalue\"" +
+                            "\n parse.key=true:" +
+                            "\n  \"key\\tvalue\"" +
+                            "\n parse.headers=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tvalue\"")
+                    .withRequiredArg()
+                    .describedAs("prop")
+                    .ofType(String.class);
+            readerConfigOpt = parser.accepts("reader-config", "Config properties file for the message reader. Note that " + propertyOpt + " takes precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ")

Review Comment:
   Sure.



##########
tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java:
##########
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static java.util.Arrays.stream;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.METADATA_MAX_AGE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.SEND_BUFFER_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.loadProps;
+import static org.apache.kafka.server.util.CommandLineUtils.maybeMergeOptions;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
+
+/**
+ * Sends {@link ProducerRecord} generated from lines read on the standard input.
+ */
+public class ConsoleProducer {
+    public static void main(String[] args) {
+        ConsoleProducer consoleProducer = new ConsoleProducer();
+        consoleProducer.start(args);
+    }
+
+    void start(String[] args) {
+        try {
+            ConsoleProducerConfig config = new ConsoleProducerConfig(args);
+            MessageReader reader = createMessageReader(config);
+            reader.init(System.in, config.getReaderProps());
+
+            KafkaProducer<byte[], byte[]> producer = createKafkaProducer(config.getProducerProps());
+            Exit.addShutdownHook("producer-shutdown-hook", producer::close);
+
+            ProducerRecord<byte[], byte[]> record;
+            do {
+                record = reader.readMessage();
+                if (record != null) {
+                    send(producer, record, config.sync());
+                }
+            } while (record != null);
+
+        } catch (OptionException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Exit.exit(1);
+        }
+        Exit.exit(0);
+    }
+
+    // VisibleForTesting
+    KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
+        return new KafkaProducer<>(props);
+    }
+
+    // VisibleForTesting
+    MessageReader createMessageReader(ConsoleProducerConfig config) throws ReflectiveOperationException {
+        return (MessageReader) Class.forName(config.readerClass()).getDeclaredConstructor().newInstance();
+    }
+
+    private void send(KafkaProducer<byte[], byte[]> producer, ProducerRecord<byte[], byte[]> record, boolean sync) throws Exception {
+        if (sync) {
+            producer.send(record).get();
+        } else {
+            producer.send(record, new ErrorLoggingCallback(record.topic(), record.key(), record.value(), false));
+        }
+    }
+
+    public static class ConsoleProducerConfig extends CommandDefaultOptions {
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<Void> syncOpt;
+        private final OptionSpec<String> compressionCodecOpt;
+        private final OptionSpec<Integer> batchSizeOpt;
+        private final OptionSpec<Integer> messageSendMaxRetriesOpt;
+        private final OptionSpec<Long> retryBackoffMsOpt;
+        private final OptionSpec<Long> sendTimeoutOpt;
+        private final OptionSpec<String> requestRequiredAcksOpt;
+        private final OptionSpec<Integer> requestTimeoutMsOpt;
+        private final OptionSpec<Long> metadataExpiryMsOpt;
+        private final OptionSpec<Long> maxBlockMsOpt;
+        private final OptionSpec<Long> maxMemoryBytesOpt;
+        private final OptionSpec<Integer> maxPartitionMemoryBytesOpt;
+        private final OptionSpec<String> messageReaderOpt;
+        private final OptionSpec<Integer> socketBufferSizeOpt;
+        private final OptionSpec<String> propertyOpt;
+        private final OptionSpec<String> readerConfigOpt;
+        private final OptionSpec<String> producerPropertyOpt;
+        private final OptionSpec<String> producerConfigOpt;
+
+        public ConsoleProducerConfig(String[] args) {
+            super(args);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("broker-list")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("server to connect to")
+                    .ofType(String.class);
+            syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.");
+            compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
+                            "If specified without value, then it defaults to 'gzip'")
+                    .withOptionalArg()
+                    .describedAs("compression-codec")
+                    .ofType(String.class);
+            batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously. " +
+                            "please note that this option will be replaced if max-partition-memory-bytes is also set")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, " +
+                            "and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. " +
+                            "This is the option to control `retries` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Integer.class)
+                    .defaultsTo(3);
+            retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. " +
+                            "Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. " +
+                            "This is the option to control `retry.backoff.ms` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Long.class)
+                    .defaultsTo(100L);
+            sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
+                            " a message will queue awaiting sufficient batch size. The value is given in ms. " +
+                            "This is the option to control `linger.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("timeout_ms")
+                    .ofType(Long.class)
+                    .defaultsTo(1000L);
+            requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required `acks` of the producer requests")
+                    .withRequiredArg()
+                    .describedAs("request required acks")
+                    .ofType(String.class)
+                    .defaultsTo("-1");
+            requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero.")
+                    .withRequiredArg()
+                    .describedAs("request timeout ms")
+                    .ofType(Integer.class)
+                    .defaultsTo(1500);
+            metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
+                            "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes. " +
+                            "This is the option to control `metadata.max.age.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("metadata expiration interval")
+                    .ofType(Long.class)
+                    .defaultsTo(5 * 60 * 1000L);
+            maxBlockMsOpt = parser.accepts("max-block-ms",
+                            "The max time that the producer will block for during a send request.")
+                    .withRequiredArg()
+                    .describedAs("max block on send")
+                    .ofType(Long.class)
+                    .defaultsTo(60 * 1000L);
+            maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
+                            "The total memory used by the producer to buffer records waiting to be sent to the server. " +
+                            "This is the option to control `buffer.memory` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("total memory in bytes")
+                    .ofType(Long.class)
+                    .defaultsTo(32 * 1024 * 1024L);
+            maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
+                            "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
+                            "will attempt to optimistically group them together until this size is reached. " +
+                            "This is the option to control `batch.size` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("memory in bytes per partition")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
+                            "By default each line is read as a separate message.")
+                    .withRequiredArg()
+                    .describedAs("reader_class")
+                    .ofType(String.class)
+                    .defaultsTo(LineMessageReader.class.getName());
+            socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size. " +
+                            "This is the option to control `send.buffer.bytes` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(1024 * 100);
+            propertyOpt = parser.accepts("property",
+                            "A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader." +
+                            "Default properties include:" +
+                            "\n parse.key=false" +
+                            "\n parse.headers=false" +
+                            "\n ignore.error=false" +
+                            "\n key.separator=\\t" +
+                            "\n headers.delimiter=\\t" +
+                            "\n headers.separator=," +
+                            "\n headers.key.separator=:" +
+                            "\n null.marker=   When set, any fields (key, value and headers) equal to this will be replaced by null" +
+                            "\nDefault parsing pattern when:" +
+                            "\n parse.headers=true and parse.key=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tkey\\tvalue\"" +
+                            "\n parse.key=true:" +
+                            "\n  \"key\\tvalue\"" +
+                            "\n parse.headers=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tvalue\"")
+                    .withRequiredArg()
+                    .describedAs("prop")
+                    .ofType(String.class);
+            readerConfigOpt = parser.accepts("reader-config", "Config properties file for the message reader. Note that " + propertyOpt + " takes precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ")
+                    .withRequiredArg()
+                    .describedAs("producer_prop")
+                    .ofType(String.class);
+            producerConfigOpt = parser.accepts("producer.config", "Producer config properties file. Note that " + producerPropertyOpt + " takes precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+
+            try {
+                options = parser.parse(args);
+
+            } catch (OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from standard input and publish it to Kafka.");
+            CommandLineUtils.checkRequiredArgs(parser, options, topicOpt);
+
+            ToolsUtils.validatePortOrDie(parser, brokerHostsAndPorts());
+        }
+
+        String brokerHostsAndPorts() {
+            return options.has(bootstrapServerOpt) ? options.valueOf(bootstrapServerOpt) : options.valueOf(brokerListOpt);
+        }
+
+        boolean sync() {
+            return options.has(syncOpt);
+        }
+
+        String compressionCodec() {
+            if (options.has(compressionCodecOpt)) {
+                String codecOptValue = options.valueOf(compressionCodecOpt);
+                // Defaults to gzip if no value is provided.
+                return codecOptValue == null || codecOptValue.isEmpty() ? CompressionType.GZIP.name : codecOptValue;
+            }
+
+            return CompressionType.NONE.name;
+        }
+
+        String readerClass() {
+            return options.valueOf(messageReaderOpt);
+        }
+
+        Properties getReaderProps() throws IOException {
+            Properties properties = new Properties();
+
+            if (options.has(readerConfigOpt)) {
+                properties.putAll(loadProps(options.valueOf(readerConfigOpt)));
+            }
+
+            properties.put("topic", options.valueOf(topicOpt));
+            properties.putAll(parseKeyValueArgs(options.valuesOf(propertyOpt)));
+            return properties;
+        }
+
+        Properties getProducerProps() throws IOException {
+            Properties properties = new Properties();
+
+            if (options.has(producerConfigOpt)) {
+                properties.putAll(loadProps(options.valueOf(producerConfigOpt)));
+            }
+
+            properties.putAll(parseKeyValueArgs(options.valuesOf(producerPropertyOpt)));
+            properties.put(BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts());
+            properties.put(COMPRESSION_TYPE_CONFIG, compressionCodec());
+            properties.putIfAbsent(CLIENT_ID_CONFIG, "console-producer");
+            properties.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+            properties.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+            maybeMergeOptions(properties, LINGER_MS_CONFIG, options, sendTimeoutOpt);
+            maybeMergeOptions(properties, ACKS_CONFIG, options, requestRequiredAcksOpt);
+            maybeMergeOptions(properties, REQUEST_TIMEOUT_MS_CONFIG, options, requestTimeoutMsOpt);
+            maybeMergeOptions(properties, RETRIES_CONFIG, options, messageSendMaxRetriesOpt);
+            maybeMergeOptions(properties, RETRY_BACKOFF_MS_CONFIG, options, retryBackoffMsOpt);
+            maybeMergeOptions(properties, SEND_BUFFER_CONFIG, options, socketBufferSizeOpt);
+            maybeMergeOptions(properties, BUFFER_MEMORY_CONFIG, options, maxMemoryBytesOpt);
+            // We currently have 2 options to set the batch.size value. We'll deprecate/remove one of them in KIP-717.
+            maybeMergeOptions(properties, BATCH_SIZE_CONFIG, options, batchSizeOpt);
+            maybeMergeOptions(properties, BATCH_SIZE_CONFIG, options, maxPartitionMemoryBytesOpt);
+            maybeMergeOptions(properties, METADATA_MAX_AGE_CONFIG, options, metadataExpiryMsOpt);
+            maybeMergeOptions(properties, MAX_BLOCK_MS_CONFIG, options, maxBlockMsOpt);
+
+            return properties;
+        }
+    }
+
+    static final class LineMessageReader implements MessageReader {
+        private String topic;
+        private BufferedReader reader;
+        private boolean parseKey;
+        private String keySeparator = "\t";
+        private boolean parseHeaders;
+        private String headersDelimiter = "\t";
+        private String headersSeparator = ",";
+        private String headersKeySeparator = ":";
+        private boolean ignoreError;
+        private int lineNumber;
+        private boolean printPrompt = System.console() != null;
+        private Pattern headersSeparatorPattern;
+        private String nullMarker;
+
+        @Override
+        public void init(InputStream inputStream, Properties props) {
+            topic = props.getProperty("topic");
+            if (props.containsKey("parse.key"))
+                parseKey = props.getProperty("parse.key").trim().equalsIgnoreCase("true");
+            if (props.containsKey("key.separator"))
+                keySeparator = props.getProperty("key.separator");
+            if (props.containsKey("parse.headers"))
+                parseHeaders = props.getProperty("parse.headers").trim().equalsIgnoreCase("true");
+            if (props.containsKey("headers.delimiter"))
+                headersDelimiter = props.getProperty("headers.delimiter");
+            if (props.containsKey("headers.separator"))
+                headersSeparator = props.getProperty("headers.separator");
+            headersSeparatorPattern = Pattern.compile(headersSeparator);
+            if (props.containsKey("headers.key.separator"))
+                headersKeySeparator = props.getProperty("headers.key.separator");
+            if (props.containsKey("ignore.error"))
+                ignoreError = props.getProperty("ignore.error").trim().equalsIgnoreCase("true");
+            if (headersDelimiter.equals(headersSeparator))
+                throw new KafkaException("headers.delimiter and headers.separator may not be equal");
+            if (headersDelimiter.equals(headersKeySeparator))
+                throw new KafkaException("headers.delimiter and headers.key.separator may not be equal");
+            if (headersSeparator.equals(headersKeySeparator))
+                throw new KafkaException("headers.separator and headers.key.separator may not be equal");
+            if (props.containsKey("null.marker"))
+                nullMarker = props.getProperty("null.marker");
+            if (keySeparator.equals(nullMarker))
+                throw new KafkaException("null.marker and key.separator may not be equal");
+            if (headersSeparator.equals(nullMarker))
+                throw new KafkaException("null.marker and headers.separator may not be equal");
+            if (headersDelimiter.equals(nullMarker))
+                throw new KafkaException("null.marker and headers.delimiter may not be equal");
+            if (headersKeySeparator.equals(nullMarker))
+                throw new KafkaException("null.marker and headers.key.separator may not be equal");
+
+            reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
+        }
+
+        @Override
+        public ProducerRecord<byte[], byte[]> readMessage() {
+            ++lineNumber;
+            if (printPrompt) {
+                System.out.print(">");
+            }
+
+            String line;
+            try {
+                line = reader.readLine();
+
+            } catch (IOException e) {
+                throw new KafkaException(e);
+            }
+
+            if (line == null) {
+                return null;
+            }
+
+            String headers = parse(parseHeaders, line, 0, headersDelimiter, "headers delimiter");
+            int headerOffset = headers == null ? 0 : headers.length() + headersDelimiter.length();
+
+            String key = parse(parseKey, line, headerOffset, keySeparator, "key separator");
+            int keyOffset = key == null ? 0 : key.length() + keySeparator.length();
+
+            String value = line.substring(headerOffset + keyOffset);
+
+            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                    topic,
+                    key != null && !key.equals(nullMarker) ? key.getBytes(StandardCharsets.UTF_8) : null,
+                    value != null && !value.equals(nullMarker) ? value.getBytes(StandardCharsets.UTF_8) : null
+            );
+
+            if (headers != null && !headers.equals(nullMarker)) {
+                stream(splitHeaders(headers)).forEach(header -> record.headers().add(header.key, header.value));
+            }
+
+            return record;
+        }
+
+        private String parse(boolean enabled, String line, int startIndex, String demarcation, String demarcationName) {
+            if (!enabled) {
+                return null;
+            }
+            int index = line.indexOf(demarcation, startIndex);
+            if (index == -1) {
+                if (ignoreError) {
+                    return null;
+                }
+                throw new KafkaException("No " + demarcationName + " found on line number " + lineNumber + ": '" + line + "'");
+            }
+            return line.substring(startIndex, index);
+        }
+
+        private Header[] splitHeaders(String headers) {
+            return stream(headersSeparatorPattern.split(headers))
+                    .map(pair -> {
+                        int i = pair.indexOf(headersKeySeparator);
+                        if (i == -1) {
+                            if (ignoreError) {
+                                return new Header(pair, null);
+                            }
+                            throw new KafkaException("No header key separator found in pair '" + pair + "' on line number " + lineNumber);
+                        }
+
+                        String headerKey = pair.substring(0, i);
+                        if (headerKey.equals(nullMarker)) {
+                            throw new KafkaException("Header keys should not be equal to the null marker '" + nullMarker + "' as they can't be null");
+                        }
+
+                        String value = pair.substring(i + headersKeySeparator.length());
+                        byte[] headerValue = value.equals(nullMarker) ? null : value.getBytes(StandardCharsets.UTF_8);
+                        return new Header(headerKey, headerValue);
+
+                    }).toArray(Header[]::new);
+        }
+
+        // VisibleForTesting
+        String keySeparator() {
+            return keySeparator;
+        }
+
+        // VisibleForTesting
+        boolean parseKey() {
+            return parseKey;
+        }
+
+        // VisibleForTesting
+        boolean parseHeaders() {
+            return parseHeaders;
+        }
+    }
+
+    // VisibleForTesting
+    static class Header {

Review Comment:
   They are perfect, indeed.



##########
server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java:
##########
@@ -16,17 +16,29 @@
  */
 package org.apache.kafka.server.util;
 
+import joptsimple.OptionParser;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.utils.Utils;
 
 import java.io.PrintStream;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static java.util.Arrays.stream;
+
 public class ToolsUtils {
 
+    public static void validatePortOrDie(OptionParser parser, String hostPort) {

Review Comment:
   Thanks.



##########
tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java:
##########
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static java.util.Arrays.stream;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.METADATA_MAX_AGE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.SEND_BUFFER_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.loadProps;
+import static org.apache.kafka.server.util.CommandLineUtils.maybeMergeOptions;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
+
+/**
+ * Sends {@link ProducerRecord} generated from lines read on the standard input.
+ */
+public class ConsoleProducer {
+    public static void main(String[] args) {
+        ConsoleProducer consoleProducer = new ConsoleProducer();
+        consoleProducer.start(args);
+    }
+
+    void start(String[] args) {
+        try {
+            ConsoleProducerConfig config = new ConsoleProducerConfig(args);
+            MessageReader reader = createMessageReader(config);
+            reader.init(System.in, config.getReaderProps());
+
+            KafkaProducer<byte[], byte[]> producer = createKafkaProducer(config.getProducerProps());
+            Exit.addShutdownHook("producer-shutdown-hook", producer::close);
+
+            ProducerRecord<byte[], byte[]> record;
+            do {
+                record = reader.readMessage();
+                if (record != null) {
+                    send(producer, record, config.sync());
+                }
+            } while (record != null);
+
+        } catch (OptionException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Exit.exit(1);
+        }
+        Exit.exit(0);
+    }
+
+    // VisibleForTesting
+    KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
+        return new KafkaProducer<>(props);
+    }
+
+    // VisibleForTesting
+    MessageReader createMessageReader(ConsoleProducerConfig config) throws ReflectiveOperationException {
+        return (MessageReader) Class.forName(config.readerClass()).getDeclaredConstructor().newInstance();
+    }
+
+    private void send(KafkaProducer<byte[], byte[]> producer, ProducerRecord<byte[], byte[]> record, boolean sync) throws Exception {
+        if (sync) {
+            producer.send(record).get();
+        } else {
+            producer.send(record, new ErrorLoggingCallback(record.topic(), record.key(), record.value(), false));
+        }
+    }
+
+    public static class ConsoleProducerConfig extends CommandDefaultOptions {
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<Void> syncOpt;
+        private final OptionSpec<String> compressionCodecOpt;
+        private final OptionSpec<Integer> batchSizeOpt;
+        private final OptionSpec<Integer> messageSendMaxRetriesOpt;
+        private final OptionSpec<Long> retryBackoffMsOpt;
+        private final OptionSpec<Long> sendTimeoutOpt;
+        private final OptionSpec<String> requestRequiredAcksOpt;
+        private final OptionSpec<Integer> requestTimeoutMsOpt;
+        private final OptionSpec<Long> metadataExpiryMsOpt;
+        private final OptionSpec<Long> maxBlockMsOpt;
+        private final OptionSpec<Long> maxMemoryBytesOpt;
+        private final OptionSpec<Integer> maxPartitionMemoryBytesOpt;
+        private final OptionSpec<String> messageReaderOpt;
+        private final OptionSpec<Integer> socketBufferSizeOpt;
+        private final OptionSpec<String> propertyOpt;
+        private final OptionSpec<String> readerConfigOpt;
+        private final OptionSpec<String> producerPropertyOpt;
+        private final OptionSpec<String> producerConfigOpt;
+
+        public ConsoleProducerConfig(String[] args) {
+            super(args);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("broker-list")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("server to connect to")
+                    .ofType(String.class);
+            syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.");
+            compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
+                            "If specified without value, then it defaults to 'gzip'")
+                    .withOptionalArg()
+                    .describedAs("compression-codec")
+                    .ofType(String.class);
+            batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously. " +
+                            "please note that this option will be replaced if max-partition-memory-bytes is also set")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, " +
+                            "and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. " +
+                            "This is the option to control `retries` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Integer.class)
+                    .defaultsTo(3);
+            retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. " +
+                            "Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. " +
+                            "This is the option to control `retry.backoff.ms` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Long.class)
+                    .defaultsTo(100L);
+            sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
+                            " a message will queue awaiting sufficient batch size. The value is given in ms. " +
+                            "This is the option to control `linger.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("timeout_ms")
+                    .ofType(Long.class)
+                    .defaultsTo(1000L);
+            requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required `acks` of the producer requests")
+                    .withRequiredArg()
+                    .describedAs("request required acks")
+                    .ofType(String.class)
+                    .defaultsTo("-1");
+            requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero.")
+                    .withRequiredArg()
+                    .describedAs("request timeout ms")
+                    .ofType(Integer.class)
+                    .defaultsTo(1500);
+            metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
+                            "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes. " +
+                            "This is the option to control `metadata.max.age.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("metadata expiration interval")
+                    .ofType(Long.class)
+                    .defaultsTo(5 * 60 * 1000L);
+            maxBlockMsOpt = parser.accepts("max-block-ms",
+                            "The max time that the producer will block for during a send request.")
+                    .withRequiredArg()
+                    .describedAs("max block on send")
+                    .ofType(Long.class)
+                    .defaultsTo(60 * 1000L);
+            maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
+                            "The total memory used by the producer to buffer records waiting to be sent to the server. " +
+                            "This is the option to control `buffer.memory` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("total memory in bytes")
+                    .ofType(Long.class)
+                    .defaultsTo(32 * 1024 * 1024L);
+            maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
+                            "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
+                            "will attempt to optimistically group them together until this size is reached. " +
+                            "This is the option to control `batch.size` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("memory in bytes per partition")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
+                            "By default each line is read as a separate message.")
+                    .withRequiredArg()
+                    .describedAs("reader_class")
+                    .ofType(String.class)
+                    .defaultsTo(LineMessageReader.class.getName());
+            socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size. " +
+                            "This is the option to control `send.buffer.bytes` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(1024 * 100);
+            propertyOpt = parser.accepts("property",
+                            "A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader." +
+                            "Default properties include:" +
+                            "\n parse.key=false" +
+                            "\n parse.headers=false" +
+                            "\n ignore.error=false" +
+                            "\n key.separator=\\t" +
+                            "\n headers.delimiter=\\t" +
+                            "\n headers.separator=," +
+                            "\n headers.key.separator=:" +
+                            "\n null.marker=   When set, any fields (key, value and headers) equal to this will be replaced by null" +
+                            "\nDefault parsing pattern when:" +
+                            "\n parse.headers=true and parse.key=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tkey\\tvalue\"" +
+                            "\n parse.key=true:" +
+                            "\n  \"key\\tvalue\"" +
+                            "\n parse.headers=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tvalue\"")
+                    .withRequiredArg()
+                    .describedAs("prop")
+                    .ofType(String.class);
+            readerConfigOpt = parser.accepts("reader-config", "Config properties file for the message reader. Note that " + propertyOpt + " takes precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ")
+                    .withRequiredArg()
+                    .describedAs("producer_prop")
+                    .ofType(String.class);
+            producerConfigOpt = parser.accepts("producer.config", "Producer config properties file. Note that " + producerPropertyOpt + " takes precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+
+            try {
+                options = parser.parse(args);
+
+            } catch (OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from standard input and publish it to Kafka.");
+            CommandLineUtils.checkRequiredArgs(parser, options, topicOpt);
+
+            ToolsUtils.validatePortOrDie(parser, brokerHostsAndPorts());

Review Comment:
   Thanks.



##########
tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java:
##########
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static java.util.Arrays.stream;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.METADATA_MAX_AGE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.SEND_BUFFER_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.loadProps;
+import static org.apache.kafka.server.util.CommandLineUtils.maybeMergeOptions;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
+
+/**
+ * Sends {@link ProducerRecord} generated from lines read on the standard input.
+ */
+public class ConsoleProducer {
+    public static void main(String[] args) {
+        ConsoleProducer consoleProducer = new ConsoleProducer();
+        consoleProducer.start(args);
+    }
+
+    void start(String[] args) {
+        try {
+            ConsoleProducerConfig config = new ConsoleProducerConfig(args);
+            MessageReader reader = createMessageReader(config);
+            reader.init(System.in, config.getReaderProps());
+
+            KafkaProducer<byte[], byte[]> producer = createKafkaProducer(config.getProducerProps());
+            Exit.addShutdownHook("producer-shutdown-hook", producer::close);
+
+            ProducerRecord<byte[], byte[]> record;
+            do {
+                record = reader.readMessage();
+                if (record != null) {
+                    send(producer, record, config.sync());
+                }
+            } while (record != null);
+
+        } catch (OptionException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Exit.exit(1);
+        }
+        Exit.exit(0);
+    }
+
+    // VisibleForTesting
+    KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
+        return new KafkaProducer<>(props);
+    }
+
+    // VisibleForTesting
+    MessageReader createMessageReader(ConsoleProducerConfig config) throws ReflectiveOperationException {
+        return (MessageReader) Class.forName(config.readerClass()).getDeclaredConstructor().newInstance();
+    }
+
+    private void send(KafkaProducer<byte[], byte[]> producer, ProducerRecord<byte[], byte[]> record, boolean sync) throws Exception {
+        if (sync) {
+            producer.send(record).get();
+        } else {
+            producer.send(record, new ErrorLoggingCallback(record.topic(), record.key(), record.value(), false));
+        }
+    }
+
+    public static class ConsoleProducerConfig extends CommandDefaultOptions {
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<Void> syncOpt;
+        private final OptionSpec<String> compressionCodecOpt;
+        private final OptionSpec<Integer> batchSizeOpt;
+        private final OptionSpec<Integer> messageSendMaxRetriesOpt;
+        private final OptionSpec<Long> retryBackoffMsOpt;
+        private final OptionSpec<Long> sendTimeoutOpt;
+        private final OptionSpec<String> requestRequiredAcksOpt;
+        private final OptionSpec<Integer> requestTimeoutMsOpt;
+        private final OptionSpec<Long> metadataExpiryMsOpt;
+        private final OptionSpec<Long> maxBlockMsOpt;
+        private final OptionSpec<Long> maxMemoryBytesOpt;
+        private final OptionSpec<Integer> maxPartitionMemoryBytesOpt;
+        private final OptionSpec<String> messageReaderOpt;
+        private final OptionSpec<Integer> socketBufferSizeOpt;
+        private final OptionSpec<String> propertyOpt;
+        private final OptionSpec<String> readerConfigOpt;
+        private final OptionSpec<String> producerPropertyOpt;
+        private final OptionSpec<String> producerConfigOpt;
+
+        public ConsoleProducerConfig(String[] args) {
+            super(args);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("broker-list")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .requiredUnless("broker-list")
+                    .withRequiredArg()
+                    .describedAs("server to connect to")
+                    .ofType(String.class);
+            syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.");
+            compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
+                            "If specified without value, then it defaults to 'gzip'")
+                    .withOptionalArg()
+                    .describedAs("compression-codec")
+                    .ofType(String.class);
+            batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously. " +
+                            "please note that this option will be replaced if max-partition-memory-bytes is also set")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, " +
+                            "and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. " +
+                            "This is the option to control `retries` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Integer.class)
+                    .defaultsTo(3);
+            retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. " +
+                            "Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. " +
+                            "This is the option to control `retry.backoff.ms` in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Long.class)
+                    .defaultsTo(100L);
+            sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
+                            " a message will queue awaiting sufficient batch size. The value is given in ms. " +
+                            "This is the option to control `linger.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("timeout_ms")
+                    .ofType(Long.class)
+                    .defaultsTo(1000L);
+            requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required `acks` of the producer requests")
+                    .withRequiredArg()
+                    .describedAs("request required acks")
+                    .ofType(String.class)
+                    .defaultsTo("-1");
+            requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero.")
+                    .withRequiredArg()
+                    .describedAs("request timeout ms")
+                    .ofType(Integer.class)
+                    .defaultsTo(1500);
+            metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
+                            "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes. " +
+                            "This is the option to control `metadata.max.age.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("metadata expiration interval")
+                    .ofType(Long.class)
+                    .defaultsTo(5 * 60 * 1000L);
+            maxBlockMsOpt = parser.accepts("max-block-ms",
+                            "The max time that the producer will block for during a send request.")
+                    .withRequiredArg()
+                    .describedAs("max block on send")
+                    .ofType(Long.class)
+                    .defaultsTo(60 * 1000L);
+            maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
+                            "The total memory used by the producer to buffer records waiting to be sent to the server. " +
+                            "This is the option to control `buffer.memory` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("total memory in bytes")
+                    .ofType(Long.class)
+                    .defaultsTo(32 * 1024 * 1024L);
+            maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
+                            "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
+                            "will attempt to optimistically group them together until this size is reached. " +
+                            "This is the option to control `batch.size` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("memory in bytes per partition")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
+                            "By default each line is read as a separate message.")
+                    .withRequiredArg()
+                    .describedAs("reader_class")
+                    .ofType(String.class)
+                    .defaultsTo(LineMessageReader.class.getName());
+            socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size. " +
+                            "This is the option to control `send.buffer.bytes` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(1024 * 100);
+            propertyOpt = parser.accepts("property",
+                            "A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader." +
+                            "Default properties include:" +
+                            "\n parse.key=false" +
+                            "\n parse.headers=false" +
+                            "\n ignore.error=false" +
+                            "\n key.separator=\\t" +
+                            "\n headers.delimiter=\\t" +
+                            "\n headers.separator=," +
+                            "\n headers.key.separator=:" +
+                            "\n null.marker=   When set, any fields (key, value and headers) equal to this will be replaced by null" +
+                            "\nDefault parsing pattern when:" +
+                            "\n parse.headers=true and parse.key=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tkey\\tvalue\"" +
+                            "\n parse.key=true:" +
+                            "\n  \"key\\tvalue\"" +
+                            "\n parse.headers=true:" +
+                            "\n  \"h1:v1,h2:v2...\\tvalue\"")
+                    .withRequiredArg()
+                    .describedAs("prop")
+                    .ofType(String.class);
+            readerConfigOpt = parser.accepts("reader-config", "Config properties file for the message reader. Note that " + propertyOpt + " takes precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ")
+                    .withRequiredArg()
+                    .describedAs("producer_prop")
+                    .ofType(String.class);
+            producerConfigOpt = parser.accepts("producer.config", "Producer config properties file. Note that " + producerPropertyOpt + " takes precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+
+            try {
+                options = parser.parse(args);
+
+            } catch (OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from standard input and publish it to Kafka.");
+            CommandLineUtils.checkRequiredArgs(parser, options, topicOpt);
+
+            ToolsUtils.validatePortOrDie(parser, brokerHostsAndPorts());
+        }
+
+        String brokerHostsAndPorts() {
+            return options.has(bootstrapServerOpt) ? options.valueOf(bootstrapServerOpt) : options.valueOf(brokerListOpt);
+        }
+
+        boolean sync() {
+            return options.has(syncOpt);
+        }
+
+        String compressionCodec() {
+            if (options.has(compressionCodecOpt)) {
+                String codecOptValue = options.valueOf(compressionCodecOpt);
+                // Defaults to gzip if no value is provided.
+                return codecOptValue == null || codecOptValue.isEmpty() ? CompressionType.GZIP.name : codecOptValue;
+            }
+
+            return CompressionType.NONE.name;
+        }
+
+        String readerClass() {
+            return options.valueOf(messageReaderOpt);
+        }
+
+        Properties getReaderProps() throws IOException {
+            Properties properties = new Properties();
+
+            if (options.has(readerConfigOpt)) {
+                properties.putAll(loadProps(options.valueOf(readerConfigOpt)));
+            }
+
+            properties.put("topic", options.valueOf(topicOpt));
+            properties.putAll(parseKeyValueArgs(options.valuesOf(propertyOpt)));
+            return properties;
+        }
+
+        Properties getProducerProps() throws IOException {
+            Properties properties = new Properties();
+
+            if (options.has(producerConfigOpt)) {
+                properties.putAll(loadProps(options.valueOf(producerConfigOpt)));
+            }
+
+            properties.putAll(parseKeyValueArgs(options.valuesOf(producerPropertyOpt)));
+            properties.put(BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts());
+            properties.put(COMPRESSION_TYPE_CONFIG, compressionCodec());
+            properties.putIfAbsent(CLIENT_ID_CONFIG, "console-producer");
+            properties.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+            properties.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+            maybeMergeOptions(properties, LINGER_MS_CONFIG, options, sendTimeoutOpt);
+            maybeMergeOptions(properties, ACKS_CONFIG, options, requestRequiredAcksOpt);
+            maybeMergeOptions(properties, REQUEST_TIMEOUT_MS_CONFIG, options, requestTimeoutMsOpt);
+            maybeMergeOptions(properties, RETRIES_CONFIG, options, messageSendMaxRetriesOpt);
+            maybeMergeOptions(properties, RETRY_BACKOFF_MS_CONFIG, options, retryBackoffMsOpt);
+            maybeMergeOptions(properties, SEND_BUFFER_CONFIG, options, socketBufferSizeOpt);
+            maybeMergeOptions(properties, BUFFER_MEMORY_CONFIG, options, maxMemoryBytesOpt);
+            // We currently have 2 options to set the batch.size value. We'll deprecate/remove one of them in KIP-717.
+            maybeMergeOptions(properties, BATCH_SIZE_CONFIG, options, batchSizeOpt);
+            maybeMergeOptions(properties, BATCH_SIZE_CONFIG, options, maxPartitionMemoryBytesOpt);
+            maybeMergeOptions(properties, METADATA_MAX_AGE_CONFIG, options, metadataExpiryMsOpt);
+            maybeMergeOptions(properties, MAX_BLOCK_MS_CONFIG, options, maxBlockMsOpt);
+
+            return properties;
+        }
+    }
+
+    static final class LineMessageReader implements MessageReader {

Review Comment:
   Makes senses.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13214:
URL: https://github.com/apache/kafka/pull/13214#issuecomment-1424569047

   @showuon if you have some time, this looks almost ready.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1104451234


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   @Hangleton Do you already have a strategy to deal with this? How are you planning to provide a deprecation period? Can we generalize this approach?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1108345004


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines
   
   We also revived KIP-641. Any feedback is welcomed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on PR #13214:
URL: https://github.com/apache/kafka/pull/13214#issuecomment-1422354781

   @fvaleri 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1100547585


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   Note that this class is only used by the `ConsoleProducer` and has one implementation. If no further used is intended, it can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13214:
URL: https://github.com/apache/kafka/pull/13214#issuecomment-1625147370

   Hi @Hangleton, what's the state of this PR? Let me know if you need some help/review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1104315943


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   Thanks David for reviewing the PR. Agreed that a KIP is appropriate given the compatibility changes to SPIs you pointed out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1102625366


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   Agreed on taking an intermediate step to preserve FQN compatibility for the SPI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on PR #13214:
URL: https://github.com/apache/kafka/pull/13214#issuecomment-1438048548

   Please ignore the request for review Frederico - was learning about the refresh icon on Github...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1100162179


##########
bin/kafka-console-producer.sh:
##########
@@ -17,4 +17,4 @@
 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
     export KAFKA_HEAP_OPTS="-Xmx512M"
 fi
-exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ConsoleProducer "$@"

Review Comment:
   I believe the same change needs to be done in `bin/windows/kafka-console-producer.bat`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1102548459


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   This is tricky. It's not a class from the public API, but it's exposed via the command line and definitely used (search for AvroMessageReader for example). I also found the still open KIP-641 which deals with this interface. 
   
   We should have a deprecation period, but at the same time we don't want to depend on core. Given that checkstyle can only handle a single root package, the only solution I see is to create a :tools:deprecated sub-module containing kafka.common.MessageReader.
   
   @mimaison @showuon WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1104482083


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   - For `MessageReader`: maybe we could provide a depreciation period until Apache Kafka 4.0 by exposing the class defined in [KIP-641](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866569) and keep the interface `kafka.common.MessageReader` inheriting from `org.apache.kafka.common.MessageReader` to avoid breaking the implementations above? The class `org.apache.kafka.common.MessageReader` could be put in the `clients` module (KIP-641 suggests to have `MessageReader` in this module since it is a public interface).
   - For the JMX Tool: maybe we could keep the existing class with a deprecation warning as you did in [PR-13195](https://github.com/apache/kafka/pull/13195) and create a shell and bat scripts as the public contract for the new class?
   - `kafka.tools.StateChangeLogMerger` could be removed if it isn't used as mentioned in [PR-13171](https://github.com/apache/kafka/pull/13171).
   - There is a missing BAT script for `FeatureCommand` although the shell script is defined. This could be added. 
   - I skimmed through the other command/tool classes and all of them seem to be exposed via a script so that there shouldn't be any compatibility problem there? What other changes do we want to address?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on PR #13214:
URL: https://github.com/apache/kafka/pull/13214#issuecomment-1424741428

   Thank you for your review Frederico (@fvaleri). I pushed the changes to address all your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1104522600


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   Thanks. This is more or less what I had in mind. I'm going to go through all tools to see if there is any further issue and then we can discuss the policy with the community and possibly have a vote.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1103557466


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   Could we update checkstyle to have an exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1112171337


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   Thanks @fvaleri and apologies for the delay, I will look at the KIP. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1101887714


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   Apologies, that is right. Thanks for correcting. On that note, this interface moved from `kafka.common.MessageReader` to `org.apache.kafka.tools.MessageReader`. Any foreign implementation will have to be updated accordingly. What is the convention to follow is in this case - preserve the same package to avoid breaking dependencies or change the package to match the new structure?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1102548459


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   This is tricky. It's not a class from the public API, but it's exposed via the command line and definitely used (search for AvroMessageReader for example). I also found the still open KIP-641 which deals with this interface. 
   
   We should have a deprecation period, but at the same time we don't want to depend on core. Given that checkstyle can only handle a single root package, the only solution I see is to create a :tools:deprecated sub-module containing kafka.common.MessageReader, to be removed in Kafka 4.0.
   
   @mimaison @showuon WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1101811769


##########
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   This would be a breaking change, because there is an option that allows to provide your own message reader implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on PR #13214:
URL: https://github.com/apache/kafka/pull/13214#issuecomment-1505225119

   > @Hangleton should we move to RecordReader interface now and then call another round of review? Also note [this comment](https://github.com/apache/kafka/pull/13454#issuecomment-1497842365).
   
   Thanks Frederico for the follow-up and call-out. I will rebase and update the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14577: Move the scala ConsoleProducer from core to tools module [kafka]

Posted by "nizhikov (via GitHub)" <gi...@apache.org>.
nizhikov commented on PR #13214:
URL: https://github.com/apache/kafka/pull/13214#issuecomment-1750793532

   Hello @Hangleton . Do you need help with review of this PR?
   Are you still workin on it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14577: Move the scala ConsoleProducer from core to tools module [kafka]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13214:
URL: https://github.com/apache/kafka/pull/13214#issuecomment-1749937548

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch) <p> If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org