You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/16 03:51:58 UTC

[kafka] branch trunk updated: KAFKA-6592: Follow-up (#4864)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9871357  KAFKA-6592: Follow-up (#4864)
9871357 is described below

commit 98713570867619de7de8e885a82d5fb0bfdb3b4e
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Sun Apr 15 20:51:53 2018 -0700

    KAFKA-6592: Follow-up (#4864)
    
    Do not require ConsoleConsumer to specify inner serde as s special property, but just a normal property of the message formatter.
---
 .../org/apache/kafka/test/MockDeserializer.java    |  5 +++
 .../main/scala/kafka/tools/ConsoleConsumer.scala   | 50 +++++++++-------------
 .../unit/kafka/tools/ConsoleConsumerTest.scala     | 16 ++++---
 .../KStreamAggregationIntegrationTest.java         |  2 +-
 4 files changed, 35 insertions(+), 38 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java b/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
index 99551f7..ac2865e 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
@@ -31,6 +31,9 @@ public class MockDeserializer implements ClusterResourceListener, Deserializer<b
     public static ClusterResource noClusterId = new ClusterResource("no_cluster_id");
     public static AtomicReference<ClusterResource> clusterIdBeforeDeserialize = new AtomicReference<>(noClusterId);
 
+    public boolean isKey;
+    public Map<String, ?> configs;
+
     public static void resetStaticVariables() {
         initCount = new AtomicInteger(0);
         closeCount = new AtomicInteger(0);
@@ -44,6 +47,8 @@ public class MockDeserializer implements ClusterResourceListener, Deserializer<b
 
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
+        this.configs = configs;
+        this.isKey = isKey;
     }
 
     @Override
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 9df4fb4..5139324 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -19,8 +19,9 @@ package kafka.tools
 
 import java.io.PrintStream
 import java.nio.charset.StandardCharsets
+import java.util
 import java.util.concurrent.CountDownLatch
-import java.util.{Locale, Properties, Random}
+import java.util.{Locale, Map, Properties, Random}
 
 import com.typesafe.scalalogging.LazyLogging
 import joptsimple._
@@ -46,11 +47,6 @@ import scala.collection.JavaConverters._
 object ConsoleConsumer extends Logging {
 
   var messageCount = 0
-  // Keep same names with StreamConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
-  // and StreamConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
-  // visible for testing
-  private[tools] val innerKeySerdeName = "default.windowed.key.serde.inner"
-  private[tools] val innerValueSerdeName = "default.windowed.value.serde.inner"
 
   private val shutdownLatch = new CountDownLatch(1)
 
@@ -306,8 +302,8 @@ object ConsoleConsumer extends Logging {
         "\tline.separator=<line.separator>\n" +
         "\tkey.deserializer=<key.deserializer>\n" +
         "\tvalue.deserializer=<value.deserializer>\n" +
-        "\tdefault.windowed.key.serde.inner=<windowed.key.serde.inner>\n" +
-        "\tdefault.windowed.value.serde.inner=<windowed.value.serde.inner>")
+        "\nUsers can also pass in customized properties for their formatter; more specifically, users " +
+        "can pass in properties keyed with \'key.deserializer.\' and \'value.deserializer.\' prefixes to configure their deserializers.")
       .withRequiredArg
       .describedAs("prop")
       .ofType(classOf[String])
@@ -344,18 +340,6 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("deserializer for values")
       .ofType(classOf[String])
-    val innerKeyDeserializerOpt = parser.accepts(innerKeySerdeName,
-      "inner serde for key when windowed deserialzier is used; would be ignored otherwise. " +
-        "For example: org.apache.kafka.common.serialization.Serdes\\$StringSerde")
-      .withRequiredArg
-      .describedAs("inner serde for key")
-      .ofType(classOf[String])
-    val innerValueDeserializerOpt = parser.accepts(innerValueSerdeName,
-      "inner serde for value when windowed deserialzier is used; would be ignored otherwise. " +
-        "For example: org.apache.kafka.common.serialization.Serdes\\$StringSerde")
-      .withRequiredArg
-      .describedAs("inner serde for values")
-      .ofType(classOf[String])
     val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
                                                        "Log lifecycle events of the consumer in addition to logging consumed " +
                                                        "messages. (This is specific for system tests.)")
@@ -400,8 +384,6 @@ object ConsoleConsumer extends Logging {
     val bootstrapServer = options.valueOf(bootstrapServerOpt)
     val keyDeserializer = options.valueOf(keyDeserializerOpt)
     val valueDeserializer = options.valueOf(valueDeserializerOpt)
-    val innerKeyDeserializer = options.valueOf(innerKeyDeserializerOpt)
-    val innerValueDeserializer = options.valueOf(innerValueDeserializerOpt)
     val isolationLevel = options.valueOf(isolationLevelOpt).toString
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
 
@@ -411,12 +393,6 @@ object ConsoleConsumer extends Logging {
     if (valueDeserializer != null && !valueDeserializer.isEmpty) {
       formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
     }
-    if (innerKeyDeserializer != null && !innerKeyDeserializer.isEmpty) {
-      formatterArgs.setProperty(innerKeySerdeName, innerKeyDeserializer)
-    }
-    if (innerValueDeserializer != null && !innerValueDeserializer.isEmpty) {
-      formatterArgs.setProperty(innerValueSerdeName, innerValueDeserializer)
-    }
 
     formatter.init(formatterArgs)
 
@@ -560,15 +536,29 @@ class DefaultMessageFormatter extends MessageFormatter {
     // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
     if (props.containsKey("key.deserializer")) {
       keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
-      keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava, true)
+      keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(stripWithPrefix("key.deserializer.", props)).asJava, true)
     }
     // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
     if (props.containsKey("value.deserializer")) {
       valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
-      valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava, false)
+      valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(stripWithPrefix("value.deserializer.", props)).asJava, false)
     }
   }
 
+  def stripWithPrefix(prefix: String, props: Properties): Properties = {
+    val newProps = new Properties()
+    import scala.collection.JavaConversions._
+    for (entry <- props) {
+      val key: String = entry._1
+      val value: String = entry._2
+
+      if (key.startsWith(prefix) && key.length > prefix.length)
+        newProps.put(key.substring(prefix.length), value)
+    }
+
+    newProps
+  }
+
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
 
     def writeSeparator(columnSeparator: Boolean): Unit = {
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index f5195c3..6f46555 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -25,7 +25,7 @@ import kafka.utils.{Exit, TestUtils}
 import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.test.MockDeserializer
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{Before, Test}
@@ -545,15 +545,17 @@ class ConsoleConsumerTest {
       "--bootstrap-server", "localhost:9092",
       "--topic", "test",
       "--property", "print.key=true",
-      "--property", "key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer",
-      "--" + ConsoleConsumer.innerKeySerdeName, "org.apache.kafka.common.serialization.Serdes$StringSerde",
-      "--property", "my-test1=abc"
+      "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
+      "--property", "key.deserializer.my-props=abc"
     )
     val config = new ConsoleConsumer.ConsumerConfig(args)
     assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
+    assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props"))
     val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
-    assertTrue(formatter.keyDeserializer.get.isInstanceOf[ByteArrayDeserializer])
-    assertTrue(config.formatterArgs.containsKey("my-test1"))
-    assertTrue(config.formatterArgs.containsKey(ConsoleConsumer.innerKeySerdeName))
+    assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer])
+    assertEquals(1, formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].configs.size)
+    assertEquals("abc", formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].configs.get("my-props"))
+    assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey)
   }
+
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index fc673d0..52b9ee8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -781,7 +781,7 @@ public class KStreamAggregationIntegrationTest {
                 "--property", "key.deserializer=" + keyDeserializer.getClass().getName(),
                 "--property", "value.deserializer=" + valueDeserializer.getClass().getName(),
                 "--property", "key.separator=" + keySeparator,
-                "--" + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.serdeFrom(innerClass).getClass().getName()
+                "--property", "key.deserializer." + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "=" + Serdes.serdeFrom(innerClass).getClass().getName()
             };
 
             ConsoleConsumer.messageCount_$eq(0); //reset the message count

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.