You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/03 16:50:43 UTC

[GitHub] [kafka] dajac commented on a change in pull request #11456: KAFKA-13351: Add possibility to write kafka headers in Kafka Console Producer

dajac commented on a change in pull request #11456:
URL: https://github.com/apache/kafka/pull/11456#discussion_r762081776



##########
File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala
##########
@@ -274,28 +296,74 @@ object ConsoleProducer {
         parseKey = props.getProperty("parse.key").trim.equalsIgnoreCase("true")
       if (props.containsKey("key.separator"))
         keySeparator = props.getProperty("key.separator")
+      if (props.containsKey("parse.headers"))
+        parseHeader = props.getProperty("parse.headers").trim.equalsIgnoreCase("true")
+      if (props.containsKey("headers.delimiter"))
+        headersDelimiter = props.getProperty("headers.delimiter")
+      if (props.containsKey("headers.separator"))
+        headersSeparator = props.getProperty("headers.separator")
+      if (props.containsKey("headers.key.separator"))
+        headerKeySeparator = props.getProperty("headers.key.separator")
       if (props.containsKey("ignore.error"))
         ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true")
       reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
     }
 
     override def readMessage() = {
       lineNumber += 1
-      if (printPrompt)
-        print(">")
-      (reader.readLine(), parseKey) match {
-        case (null, _) => null
-        case (line, true) =>
-          line.indexOf(keySeparator) match {
-            case -1 =>
-              if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
-              else throw new KafkaException(s"No key found on line $lineNumber: $line")
-            case n =>
-              val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8)
-              new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value)
-          }
-        case (line, false) =>
-          new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
+      if (printPrompt) print(">")
+      val line = reader.readLine()
+      try{
+        (line, parseKey, parseHeader) match {
+          case (null, _, _) => null
+          case (line, true, true) =>
+            val Array(headers, key, value) = line.split("[" + headersDelimiter + keySeparator + "]")

Review comment:
       The downsides of using `split` with a regex are that it has to compile the regular expression for every single line, allocate an array, and deconstruct the array. That seems to be a bit wasteful here given that we only have to split two times. Did you consider using two `indexOf` in conjunction with `substring` to extract the header part, the key part, and finally the value part?

##########
File path: core/src/test/scala/unit/kafka/tools/LineMessageReaderTest.scala
##########
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.tools
+
+import org.apache.kafka.common.KafkaException
+import kafka.tools.ConsoleProducer.LineMessageReader
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.header.Header
+import org.apache.kafka.common.header.internals.RecordHeader
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.Test
+
+import java.io.ByteArrayInputStream
+import java.lang
+import java.util.Arrays.asList
+import java.util.Properties
+
+class LineMessageReaderTest {
+
+  private def defaultTestProps = {
+    val props = new Properties()
+    props.put("topic", "topic")
+    props.put("parse.key", "true")
+    props.put("parse.headers", "true")
+    props
+  }
+
+
+  @Test
+  def testLineReader(): Unit = {
+    val lineReader = new LineMessageReader();

Review comment:
       I really like all the tests! However, they are quite repetitive. One way to reduce the boilerplate code would be to define a helper method which runs the test case. The method would take the input line, the properties, and the expected results.

##########
File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala
##########
@@ -274,28 +296,74 @@ object ConsoleProducer {
         parseKey = props.getProperty("parse.key").trim.equalsIgnoreCase("true")
       if (props.containsKey("key.separator"))
         keySeparator = props.getProperty("key.separator")
+      if (props.containsKey("parse.headers"))
+        parseHeader = props.getProperty("parse.headers").trim.equalsIgnoreCase("true")
+      if (props.containsKey("headers.delimiter"))
+        headersDelimiter = props.getProperty("headers.delimiter")
+      if (props.containsKey("headers.separator"))
+        headersSeparator = props.getProperty("headers.separator")
+      if (props.containsKey("headers.key.separator"))
+        headerKeySeparator = props.getProperty("headers.key.separator")
       if (props.containsKey("ignore.error"))
         ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true")
       reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
     }
 
     override def readMessage() = {
       lineNumber += 1
-      if (printPrompt)
-        print(">")
-      (reader.readLine(), parseKey) match {
-        case (null, _) => null
-        case (line, true) =>
-          line.indexOf(keySeparator) match {
-            case -1 =>
-              if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
-              else throw new KafkaException(s"No key found on line $lineNumber: $line")
-            case n =>
-              val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8)
-              new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value)
-          }
-        case (line, false) =>
-          new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
+      if (printPrompt) print(">")
+      val line = reader.readLine()
+      try{
+        (line, parseKey, parseHeader) match {
+          case (null, _, _) => null
+          case (line, true, true) =>
+            val Array(headers, key, value) = line.split("[" + headersDelimiter + keySeparator + "]")
+            new ProducerRecord(topic, null, null, key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8), mapHeaders(headers))
+          case (line, true, false) =>
+            val Array(key, value) = line.split("[" + keySeparator + "]")
+            new ProducerRecord(topic, key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8))
+          case (line, false, true) =>
+            val Array(headers, value) = line.split("[" + headersDelimiter + "]")
+            new ProducerRecord(topic, null, null, null, value.getBytes(StandardCharsets.UTF_8), mapHeaders(headers))
+          case (line, false, false) => new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
+        }
+      } catch {
+        case _: MatchError => onMatchError(line)
+      }
+    }
+
+    private def onMatchError(line: String): ProducerRecord[Array[Byte], Array[Byte]] = {
+      if (ignoreError) {
+        new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
+      } else {
+        throw new KafkaException("Could not parse line " + lineNumber + ", most likely line does not match pattern: " + illustratePattern())
+      }
+    }
+
+    def mapHeaders(headers: String): lang.Iterable[Header] = {
+      asList(
+        headers.split(headersSeparator)
+          .map(_.split(headerKeySeparator))
+          .map(keyValue => new RecordHeader(keyValue(0), keyValue(1).getBytes(StandardCharsets.UTF_8)))
+          : _*
+      )

Review comment:
       What does it do if the header is malformed?

##########
File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala
##########
@@ -208,9 +213,22 @@ object ConsoleProducer {
       .defaultsTo(1024*100)
     val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
       "This allows custom configuration for a user-defined message reader. Default properties include:\n" +
-      "\tparse.key=true|false\n" +
-      "\tkey.separator=<key.separator>\n" +
-      "\tignore.error=true|false")
+      "\tparse.key=false\n" +
+      "\tparse.headers=false\n" +
+      "\tignore.error=false\n" +
+      "Default parsing pattern when:\n" +
+      "\tparse.headers=true & parse.key=true:\n" +
+      "\t \"h1:v1,h2...\\tkey\\tvalue\"\n" +
+      "\tparse.headers=false & parse.key=true:\n" +
+      "\t \"key\\tvalue\"\n" +
+      "\tparse.headers=true & parse.key=false:\n" +
+      "\t \"h1:v1,h2...\\tvalue\"\n" +
+      "Customize pattern via (defaults shown)\n" +
+      "\tkey.separator=\\t\n" +
+      "\theaders.delimiter=\\t\n" +
+      "\theaders.separator=,\n" +
+      "\theaders.key.separator=:"
+      )

Review comment:
       This is quite hard to read. You could use a multiline string in Scala with `"""   """`. You can take a look at how we did it in the `ConsoleConsumer.`
   
   Also, it would be great if we could have all the parameters on the top followed by the examples.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org