You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/02 19:38:14 UTC

kafka git commit: KAFKA-3982: Fix processing order of some of the consumer properties

Repository: kafka
Updated Branches:
  refs/heads/trunk 0a8b10e27 -> b63e41ea7


KAFKA-3982: Fix processing order of some of the consumer properties

This PR updates processing of console consumer's input properties.

For both old and new consumer, the value provided for `auto.offset.reset` indirectly through `consumer.config` or `consumer.property` arguments will now take effect.
For new consumer and for `key.deserializer` and `value.deserializer` properties, the precedence order is fixed to first the value directly provided as an argument, then the value provided indirectly via `consumer.property` and then `consumer.config`, and finally a default value.

Author: Vahid Hashemian <va...@us.ibm.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1655 from vahidhashemian/KAFKA-3982


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b63e41ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b63e41ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b63e41ea

Branch: refs/heads/trunk
Commit: b63e41ea78a58bdea78be33f90bfcb61ce5988d3
Parents: 0a8b10e
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Fri Jun 2 12:38:11 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Jun 2 12:38:11 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleConsumer.scala     | 34 ++++++++++++++++----
 .../unit/kafka/tools/ConsoleConsumerTest.scala  | 32 ++++++++++++++++++
 2 files changed, 60 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b63e41ea/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index a1e2ffa..664557a 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -174,7 +174,8 @@ object ConsoleConsumer extends Logging {
 
     props.putAll(config.consumerProps)
     props.putAll(config.extraConsumerProps)
-    props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest")
+    if (!props.containsKey("auto.offset.reset"))
+      props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest")
     props.put("zookeeper.connect", config.zkConnectionStr)
 
     if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt) &&
@@ -197,7 +198,8 @@ object ConsoleConsumer extends Logging {
 
     props.putAll(config.consumerProps)
     props.putAll(config.extraConsumerProps)
-    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest")
+    if (!props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest")
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
@@ -327,12 +329,32 @@ object ConsoleConsumer extends Logging {
     val isolationLevel = options.valueOf(isolationLevelOpt).toString
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
 
-    if (keyDeserializer != null && !keyDeserializer.isEmpty) {
+    if (keyDeserializer != null && !keyDeserializer.isEmpty)
+      // the argument that is provided directly takes precedence
       formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
-    }
-    if (valueDeserializer != null && !valueDeserializer.isEmpty) {
+    else if (extraConsumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
+      // then the argument that is provided through --consumer-property
+      formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, extraConsumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
+    else if (consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
+      // then the argument that is provided through --consumer.config
+      formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
+    else
+      // the default is used if the argument is not provided directly or indirectly
+      formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
+
+    if (valueDeserializer != null && !valueDeserializer.isEmpty)
+      // the argument that is provided directly takes precedence
       formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
-    }
+    else if (extraConsumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
+      // then the argument that is provided through --consumer-property
+      formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, extraConsumerProps.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
+    else if (consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
+      // then the argument that is provided through --consumer.config
+      formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumerProps.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
+    else
+      // the default is used if the argument is not provided directly or indirectly
+      formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
+
     formatter.init(formatterArgs)
 
     if (useOldConsumer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b63e41ea/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index e0917a2..3cfb5a5 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -191,4 +191,36 @@ class ConsoleConsumerTest {
 
     assertEquals("1000", config.consumerProps.getProperty("request.timeout.ms"))
   }
+
+  @Test
+  def shouldOverwriteConfigFromConfigFileOrPropertiesWithConfigFromArguments() {
+    val propsFile = TestUtils.tempFile()
+    val propsStream = new FileOutputStream(propsFile)
+    propsStream.write("bootstrap.servers=localhost:9093\n".getBytes())
+    propsStream.write("auto.offset.reset=earliest\n".getBytes())
+    propsStream.write("key.deserializer=org.apache.kafka.common.serialization.LongDeserializer\n".getBytes())
+    propsStream.write("value.deserializer=org.apache.kafka.common.serialization.LongDeserializer".getBytes())
+    propsStream.close()
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--key-deserializer", "org.apache.kafka.common.serialization.DoubleDeserializer",
+      "--value-deserializer", "org.apache.kafka.common.serialization.DoubleDeserializer",
+      "--consumer-property", "auto.offset.reset=latest",
+      "--consumer-property", "key.deserializer=org.apache.kafka.common.serialization.FloatDeserializer",
+      "--consumer-property", "value.deserializer=org.apache.kafka.common.serialization.FloatDeserializer",
+      "--consumer.config", propsFile.getAbsolutePath
+    )
+
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    val props = ConsoleConsumer.getNewConsumerProps(config)
+
+    assertEquals("localhost:9092", props.getProperty("bootstrap.servers"))
+    assertEquals("latest", props.getProperty("auto.offset.reset"))
+    assertEquals("org.apache.kafka.common.serialization.DoubleDeserializer", config.formatterArgs.getProperty("key.deserializer"))
+    assertEquals("org.apache.kafka.common.serialization.DoubleDeserializer", config.formatterArgs.getProperty("value.deserializer"))
+    // serde settings applies to message formatter only, not the consumer itself
+    assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.getProperty("key.deserializer"))
+    assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.getProperty("value.deserializer"))
+  }
 }