You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/29 17:45:54 UTC

[GitHub] [kafka] badaiaqrandista opened a new pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

badaiaqrandista opened a new pull request #9099:
URL: https://github.com/apache/kafka/pull/9099


   Implementation of KIP-431 - Support of printing additional ConsumerRecord fields in DefaultMessageFormatter
   
   https://cwiki.apache.org/confluence/display/KAFKA/KIP-431%3A+Support+of+printing+additional+ConsumerRecord+fields+in+DefaultMessageFormatter
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck merged pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck merged pull request #9099:
URL: https://github.com/apache/kafka/pull/9099


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-678681825


   Ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-678681778


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on a change in pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on a change in pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#discussion_r465105897



##########
File path: core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala
##########
@@ -0,0 +1,235 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package unit.kafka.tools
+
+import java.io.{ByteArrayOutputStream, Closeable, PrintStream}
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.Properties
+
+import kafka.tools.DefaultMessageFormatter
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.Header
+import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
+import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.serialization.Deserializer
+import org.junit.Assert._
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+
+import scala.jdk.CollectionConverters._
+
+@RunWith(value = classOf[Parameterized])
+class DefaultMessageFormatterTest(name: String, record: ConsumerRecord[Array[Byte], Array[Byte]], properties: Map[String, String], expected: String) {
+  import DefaultMessageFormatterTest._
+
+  @Test
+  def testWriteRecord()= {
+    withResource(new ByteArrayOutputStream()) { baos =>
+      withResource(new PrintStream(baos)) { ps =>
+        val formatter = buildFormatter(properties)
+        formatter.writeTo(record, ps)
+        val actual = new String(baos.toByteArray(), StandardCharsets.UTF_8)
+        assertEquals(expected, actual)
+
+      }
+    }
+  }
+}
+
+object DefaultMessageFormatterTest {
+  @Parameters(name = "Test {index} - {0}")
+  def parameters: java.util.Collection[Array[Object]] = {
+    Seq(
+      Array(
+        "print nothing",
+        consumerRecord(),
+        Map("print.value" -> "false"),
+        ""),
+      Array(
+        "print key",
+        consumerRecord(),
+        Map("print.key" -> "true", "print.value" -> "false"),
+        "someKey\n"),
+      Array(
+        "print value",
+        consumerRecord(),
+        Map(),
+        "someValue\n"),
+      Array(
+        "print empty timestamp",
+        consumerRecord(timestampType = TimestampType.NO_TIMESTAMP_TYPE),
+        Map("print.timestamp" -> "true", "print.value" -> "false"),
+        "NO_TIMESTAMP\n"),
+      Array(
+        "print log append time timestamp",
+        consumerRecord(timestampType = TimestampType.LOG_APPEND_TIME),
+        Map("print.timestamp" -> "true", "print.value" -> "false"),
+        "LogAppendTime:1234\n"),
+      Array(
+        "print create time timestamp",
+        consumerRecord(timestampType = TimestampType.CREATE_TIME),
+        Map("print.timestamp" -> "true", "print.value" -> "false"),
+        "CreateTime:1234\n"),
+      Array(
+        "print partition",
+        consumerRecord(),
+        Map("print.partition" -> "true", "print.value" -> "false"),
+        "Partition:9\n"),
+      Array(
+        "print offset",
+        consumerRecord(),
+        Map("print.offset" -> "true", "print.value" -> "false"),
+        "Offset:9876\n"),
+      Array(
+        "print headers",
+        consumerRecord(),
+        Map("print.headers" -> "true", "print.value" -> "false"),
+        "h1:v1,h2:v2\n"),
+      Array(
+        "print empty headers",
+        consumerRecord(headers = Nil),
+        Map("print.headers" -> "true", "print.value" -> "false"),
+        "NO_HEADERS\n"),
+      Array(
+        "print all possible fields with default delimiters",
+        consumerRecord(),
+        Map("print.key" -> "true",

Review comment:
       @dajac 
   
   I've re-indent these as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck removed a comment on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck removed a comment on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-678681778


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-703928924


   Java 11 and Java 15 passed.
   
   Java 8 failed with `org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState` known flaky test.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#discussion_r462306541



##########
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##########
@@ -459,48 +466,32 @@ class DefaultMessageFormatter extends MessageFormatter {
   var printKey = false
   var printValue = true
   var printPartition = false
-  var keySeparator = "\t".getBytes(StandardCharsets.UTF_8)
-  var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8)
+  var printOffset = false
+  var printHeaders = false
+  var keySeparator = utfBytes("\t")
+  var lineSeparator = utfBytes("\n")
+  var headersSeparator = utfBytes(",")
+  var nullLiteral = utfBytes("null")
 
   var keyDeserializer: Option[Deserializer[_]] = None
   var valueDeserializer: Option[Deserializer[_]] = None
-
-  override def configure(configs: Map[String, _]): Unit = {
-    val props = new java.util.Properties()
-    configs.asScala.foreach { case (key, value) => props.put(key, value.toString) }
-    if (props.containsKey("print.timestamp"))
-      printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
-    if (props.containsKey("print.key"))
-      printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
-    if (props.containsKey("print.value"))
-      printValue = props.getProperty("print.value").trim.equalsIgnoreCase("true")
-    if (props.containsKey("print.partition"))
-      printPartition = props.getProperty("print.partition").trim.equalsIgnoreCase("true")
-    if (props.containsKey("key.separator"))
-      keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8)
-    if (props.containsKey("line.separator"))
-      lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
-    // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
-    if (props.containsKey("key.deserializer")) {
-      keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).getDeclaredConstructor()
-        .newInstance().asInstanceOf[Deserializer[_]])
-      keyDeserializer.get.configure(propertiesWithKeyPrefixStripped("key.deserializer.", props).asScala.asJava, true)
-    }
-    // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
-    if (props.containsKey("value.deserializer")) {
-      valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).getDeclaredConstructor()
-        .newInstance().asInstanceOf[Deserializer[_]])
-      valueDeserializer.get.configure(propertiesWithKeyPrefixStripped("value.deserializer.", props).asScala.asJava, false)
-    }
-  }
-
-  private def propertiesWithKeyPrefixStripped(prefix: String, props: Properties): Properties = {
-    val newProps = new Properties()
-    props.asScala.foreach { case (key, value) =>
-      if (key.startsWith(prefix) && key.length > prefix.length)
-        newProps.put(key.substring(prefix.length), value)
-    }
-    newProps
+  var headersDeserializer: Option[Deserializer[_]] = None
+
+  override def init(props: Properties): Unit = {

Review comment:
       `init(props: Properties)` has been deprecated. It would be great if we could keep using `configure(configs: Map[String, _])` as before. I think that we should also try to directly extract the values from the `Map` instead of using a `Properties`.

##########
File path: core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala
##########
@@ -0,0 +1,235 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package unit.kafka.tools
+
+import java.io.{ByteArrayOutputStream, Closeable, PrintStream}
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.Properties
+
+import kafka.tools.DefaultMessageFormatter
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.Header
+import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
+import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.serialization.Deserializer
+import org.junit.Assert._
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+
+import scala.jdk.CollectionConverters._
+
+@RunWith(value = classOf[Parameterized])
+class DefaultMessageFormatterTest(name: String, record: ConsumerRecord[Array[Byte], Array[Byte]], properties: Map[String, String], expected: String) {
+  import DefaultMessageFormatterTest._
+
+  @Test
+  def testWriteRecord()= {
+    withResource(new ByteArrayOutputStream()) { baos =>
+      withResource(new PrintStream(baos)) { ps =>
+        val formatter = buildFormatter(properties)
+        formatter.writeTo(record, ps)
+        val actual = new String(baos.toByteArray(), StandardCharsets.UTF_8)
+        assertEquals(expected, actual)
+
+      }
+    }
+  }
+}
+
+object DefaultMessageFormatterTest {
+  @Parameters(name = "Test {index} - {0}")
+  def parameters: java.util.Collection[Array[Object]] = {
+    Seq(
+      Array(
+        "print nothing",
+        consumerRecord(),
+        Map("print.value" -> "false"),
+        ""),
+      Array(
+        "print key",
+        consumerRecord(),
+        Map("print.key" -> "true", "print.value" -> "false"),
+        "someKey\n"),
+      Array(
+        "print value",
+        consumerRecord(),
+        Map(),
+        "someValue\n"),
+      Array(
+        "print empty timestamp",
+        consumerRecord(timestampType = TimestampType.NO_TIMESTAMP_TYPE),
+        Map("print.timestamp" -> "true", "print.value" -> "false"),
+        "NO_TIMESTAMP\n"),
+      Array(
+        "print log append time timestamp",
+        consumerRecord(timestampType = TimestampType.LOG_APPEND_TIME),
+        Map("print.timestamp" -> "true", "print.value" -> "false"),
+        "LogAppendTime:1234\n"),
+      Array(
+        "print create time timestamp",
+        consumerRecord(timestampType = TimestampType.CREATE_TIME),
+        Map("print.timestamp" -> "true", "print.value" -> "false"),
+        "CreateTime:1234\n"),
+      Array(
+        "print partition",
+        consumerRecord(),
+        Map("print.partition" -> "true", "print.value" -> "false"),
+        "Partition:9\n"),
+      Array(
+        "print offset",
+        consumerRecord(),
+        Map("print.offset" -> "true", "print.value" -> "false"),
+        "Offset:9876\n"),
+      Array(
+        "print headers",
+        consumerRecord(),
+        Map("print.headers" -> "true", "print.value" -> "false"),
+        "h1:v1,h2:v2\n"),
+      Array(
+        "print empty headers",
+        consumerRecord(headers = Nil),
+        Map("print.headers" -> "true", "print.value" -> "false"),
+        "NO_HEADERS\n"),
+      Array(
+        "print all possible fields with default delimiters",
+        consumerRecord(),
+        Map("print.key" -> "true",

Review comment:
       nit: Move `print.key` to next line to remain consistent with the formatting of the other Maps.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-679258872


   > I should just delete the test for DefaultMessageFormatter in ConsoleConsumerTest.scala.
   
   @badaiaqrandista
   
   I'm not sure.  My vote would be to keep the test but move it over to the `DefaultMessageFormatterTest.scala` class.  
   
   But I'm not familiar enough with this code to say for sure.  From a quick look at the old test, it's not clear to me why it failed.  I guess the `Partition number` gets printed by default now?
   
   \cc @dajac 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-678691061


   retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-704322552


   Merged #9099 into trunk


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-678682048


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-703675276


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-701952173


   Sorry guys. I have completely forgot this one. I think that it is fine to keep both tests as suggested by @badaiaqrandista.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-678834247


   @bbejeck 
   
   One question: As part of my PR, I add a new test for DefaultMessageFormatter class: core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala
   
   But I found that there is another unit test that include a test for DefaultMessageFormatter class:
   https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala#L501
   
   This was the cause of the system test failure yesterday. I changed the latter test to work with the updated DefaultMessageFormatter. But after thinking about it further, I should just delete the test for DefaultMessageFormatter in ConsoleConsumerTest.scala.
   
   What do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-679398619


   @bbejeck 
   
   It failed because it print the partition number as a single integer after the value. I moved the partition to be before the key (if printed) and value, and also added prefix "Partition:" to differentiate it from "Offset:".
   
   It's only printed if "print.partition=true".
   
   I will keep the tests as is then. ConsoleConsumerTest is a unit test for the class while DefaultMessageFormatterTest is more of an integration test.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-703939623


   @bbejeck Do I need to do anything on this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-701612365


   @badaiaqrandista can you rebase this PR?   We have a little bit of time to get this into 2.7 before the code freeze on 10/21


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on a change in pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on a change in pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#discussion_r465105476



##########
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##########
@@ -459,48 +466,32 @@ class DefaultMessageFormatter extends MessageFormatter {
   var printKey = false
   var printValue = true
   var printPartition = false
-  var keySeparator = "\t".getBytes(StandardCharsets.UTF_8)
-  var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8)
+  var printOffset = false
+  var printHeaders = false
+  var keySeparator = utfBytes("\t")
+  var lineSeparator = utfBytes("\n")
+  var headersSeparator = utfBytes(",")
+  var nullLiteral = utfBytes("null")
 
   var keyDeserializer: Option[Deserializer[_]] = None
   var valueDeserializer: Option[Deserializer[_]] = None
-
-  override def configure(configs: Map[String, _]): Unit = {
-    val props = new java.util.Properties()
-    configs.asScala.foreach { case (key, value) => props.put(key, value.toString) }
-    if (props.containsKey("print.timestamp"))
-      printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
-    if (props.containsKey("print.key"))
-      printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
-    if (props.containsKey("print.value"))
-      printValue = props.getProperty("print.value").trim.equalsIgnoreCase("true")
-    if (props.containsKey("print.partition"))
-      printPartition = props.getProperty("print.partition").trim.equalsIgnoreCase("true")
-    if (props.containsKey("key.separator"))
-      keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8)
-    if (props.containsKey("line.separator"))
-      lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
-    // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
-    if (props.containsKey("key.deserializer")) {
-      keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).getDeclaredConstructor()
-        .newInstance().asInstanceOf[Deserializer[_]])
-      keyDeserializer.get.configure(propertiesWithKeyPrefixStripped("key.deserializer.", props).asScala.asJava, true)
-    }
-    // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
-    if (props.containsKey("value.deserializer")) {
-      valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).getDeclaredConstructor()
-        .newInstance().asInstanceOf[Deserializer[_]])
-      valueDeserializer.get.configure(propertiesWithKeyPrefixStripped("value.deserializer.", props).asScala.asJava, false)
-    }
-  }
-
-  private def propertiesWithKeyPrefixStripped(prefix: String, props: Properties): Properties = {
-    val newProps = new Properties()
-    props.asScala.foreach { case (key, value) =>
-      if (key.startsWith(prefix) && key.length > prefix.length)
-        newProps.put(key.substring(prefix.length), value)
-    }
-    newProps
+  var headersDeserializer: Option[Deserializer[_]] = None
+
+  override def init(props: Properties): Unit = {

Review comment:
       @dajac 
   
   I have replaced `init` with `configure` and changed code to extract the values directly from `Map`. Please review again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-673156754


   @dajac Can you please re-review the changes I've made?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-704322766


   Thanks for the contribution @badaiaqrandista!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-703675565


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-678641314


   @bbejeck Thank you for including the changes in the test. I've fixed the errors found by the test. Please test again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-703947094


   >Do I need to do anything on this?
   
   @badaiaqrandista, nope as soon as we can get a green build, I'll merge it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-677743288


   Ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org