You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/03 11:09:23 UTC

[GitHub] [kafka] mimaison opened a new pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

mimaison opened a new pull request #8604:
URL: https://github.com/apache/kafka/pull/8604


   This PR includes 3 MessageFormatters for MirrorMaker2 internal topics:
   - HeartbeatFormatter
   - CheckpointFormatter
   - OffsetSyncFormatter
   
   This also introduces a new public interface org.apache.kafka.common.MessageFormatter that users can implement to build custom formatters.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#issuecomment-653436024


   All failures were Streams, merging.
   
   - JDK 11 and Scala 2.13:
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   
   - JDK 14 and Scala 2.13:
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   
   - JDK 8 and Scala 2.12:
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#discussion_r448980591



##########
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##########
@@ -466,7 +465,9 @@ class DefaultMessageFormatter extends MessageFormatter {
   var keyDeserializer: Option[Deserializer[_]] = None
   var valueDeserializer: Option[Deserializer[_]] = None
 
-  override def init(props: Properties): Unit = {
+  override def configure(configs: Map[String, _]): Unit = {
+    val props = new java.util.Properties()
+    configs.asScala.foreach { case (key, value) => props.put(key, value.toString) }

Review comment:
       I initially was doing this but this forces casting the value to strings everywhere due to the type of the Map here `[String, _]`. Keeping the `Properties` object turned out way simpler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#discussion_r449545905



##########
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##########
@@ -466,7 +465,9 @@ class DefaultMessageFormatter extends MessageFormatter {
   var keyDeserializer: Option[Deserializer[_]] = None
   var valueDeserializer: Option[Deserializer[_]] = None
 
-  override def init(props: Properties): Unit = {
+  override def configure(configs: Map[String, _]): Unit = {
+    val props = new java.util.Properties()
+    configs.asScala.foreach { case (key, value) => props.put(key, value.toString) }

Review comment:
       That's a good point. Probably, the correct way to do this in the future would be to define a `ConfigDef` to parse the Map. We can tackle this as part of https://github.com/apache/kafka/pull/8909.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#issuecomment-647618474


   @kkonstantine @omkreddy Can you take a look? Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
mimaison merged pull request #8604:
URL: https://github.com/apache/kafka/pull/8604


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#issuecomment-653099210


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#discussion_r448982505



##########
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##########
@@ -560,20 +561,18 @@ class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
 }
 
 class NoOpMessageFormatter extends MessageFormatter {
-  override def init(props: Properties): Unit = {}
 
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {}
 }
 
 class ChecksumMessageFormatter extends MessageFormatter {
   private var topicStr: String = _
 
-  override def init(props: Properties): Unit = {
-    topicStr = props.getProperty("topic")
-    if (topicStr != null)
-      topicStr = topicStr + ":"
+  override def configure(configs: Map[String, _]): Unit = {
+    topicStr = if (configs.get("topic") != null)

Review comment:
       :+1: done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#issuecomment-653051628


   If it’s a committer’s PR I think the common practice is to leave the responsibility of merging to the author of the PR. 
   
   +1


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#issuecomment-652991020


   @kkonstantine I see you approved but did not merge. Was there any reason?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#issuecomment-645910843


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#discussion_r448979596



##########
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##########
@@ -309,7 +308,7 @@ object ConsoleConsumer extends Logging {
       formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
     }
 
-    formatter.init(formatterArgs)
+    formatter.configure(formatterArgs.asScala.asJava)

Review comment:
       This trick is used in a couple of other places. I took a quick look at switching to a `Map` directly but it's used in a bunch of places so ended up being a larger change. I'd prefer keeping this PR focussed on the new formatter and maybe do some refactoring in a follow up PR. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#discussion_r444504939



##########
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##########
@@ -560,16 +561,15 @@ class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
 }
 
 class NoOpMessageFormatter extends MessageFormatter {
-  override def init(props: Properties): Unit = {}
 
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {}
 }
 
 class ChecksumMessageFormatter extends MessageFormatter {
   private var topicStr: String = _
 
-  override def init(props: Properties): Unit = {
-    topicStr = props.getProperty("topic")
+  override def configure(configs: Map[String, _]): Unit = {
+    topicStr = configs.get("topic").toString

Review comment:
       Good catch! I've pushed a change to address it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#discussion_r449544703



##########
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##########
@@ -309,7 +308,7 @@ object ConsoleConsumer extends Logging {
       formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
     }
 
-    formatter.init(formatterArgs)
+    formatter.configure(formatterArgs.asScala.asJava)

Review comment:
       I was thinking about adding a second utility method `CommandLineUtils.parseKeyValueArgsAsMap` and keep the existing one unchanged. We can do this as a follow up for sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#discussion_r443970569



##########
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##########
@@ -560,16 +561,15 @@ class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
 }
 
 class NoOpMessageFormatter extends MessageFormatter {
-  override def init(props: Properties): Unit = {}
 
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {}
 }
 
 class ChecksumMessageFormatter extends MessageFormatter {
   private var topicStr: String = _
 
-  override def init(props: Properties): Unit = {
-    topicStr = props.getProperty("topic")
+  override def configure(configs: Map[String, _]): Unit = {
+    topicStr = configs.get("topic").toString

Review comment:
       can we add a test?

##########
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##########
@@ -560,16 +561,15 @@ class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
 }
 
 class NoOpMessageFormatter extends MessageFormatter {
-  override def init(props: Properties): Unit = {}
 
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {}
 }
 
 class ChecksumMessageFormatter extends MessageFormatter {
   private var topicStr: String = _
 
-  override def init(props: Properties): Unit = {
-    topicStr = props.getProperty("topic")
+  override def configure(configs: Map[String, _]): Unit = {
+    topicStr = configs.get("topic").toString

Review comment:
       do we risk NPE here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#discussion_r448178095



##########
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##########
@@ -560,20 +561,18 @@ class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
 }
 
 class NoOpMessageFormatter extends MessageFormatter {
-  override def init(props: Properties): Unit = {}
 
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {}
 }
 
 class ChecksumMessageFormatter extends MessageFormatter {
   private var topicStr: String = _
 
-  override def init(props: Properties): Unit = {
-    topicStr = props.getProperty("topic")
-    if (topicStr != null)
-      topicStr = topicStr + ":"
+  override def configure(configs: Map[String, _]): Unit = {
+    topicStr = if (configs.get("topic") != null)

Review comment:
       nit: We could use `containsKey`.

##########
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##########
@@ -309,7 +308,7 @@ object ConsoleConsumer extends Logging {
       formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
     }
 
-    formatter.init(formatterArgs)
+    formatter.configure(formatterArgs.asScala.asJava)

Review comment:
       `asScala.asJava` looks a bit weird here. I understand that you are using this trick to convert the Properties to a Scala Map and then a Java Map. How about loading `formatterArgs` as a Map directly?

##########
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##########
@@ -466,7 +465,9 @@ class DefaultMessageFormatter extends MessageFormatter {
   var keyDeserializer: Option[Deserializer[_]] = None
   var valueDeserializer: Option[Deserializer[_]] = None
 
-  override def init(props: Properties): Unit = {
+  override def configure(configs: Map[String, _]): Unit = {
+    val props = new java.util.Properties()
+    configs.asScala.foreach { case (key, value) => props.put(key, value.toString) }

Review comment:
       Can't we use `configs` directly instead of building `props`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org