You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/02 19:38:14 UTC
kafka git commit: KAFKA-3982: Fix processing order of some of the
consumer properties
Repository: kafka
Updated Branches:
refs/heads/trunk 0a8b10e27 -> b63e41ea7
KAFKA-3982: Fix processing order of some of the consumer properties
This PR updates processing of console consumer's input properties.
For both old and new consumer, the value provided for `auto.offset.reset` indirectly through `consumer.config` or `consumer.property` arguments will now take effect.
For new consumer and for `key.deserializer` and `value.deserializer` properties, the precedence order is fixed to first the value directly provided as an argument, then the value provided indirectly via `consumer.property` and then `consumer.config`, and finally a default value.
Author: Vahid Hashemian <va...@us.ibm.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1655 from vahidhashemian/KAFKA-3982
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b63e41ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b63e41ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b63e41ea
Branch: refs/heads/trunk
Commit: b63e41ea78a58bdea78be33f90bfcb61ce5988d3
Parents: 0a8b10e
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Fri Jun 2 12:38:11 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Jun 2 12:38:11 2017 -0700
----------------------------------------------------------------------
.../scala/kafka/tools/ConsoleConsumer.scala | 34 ++++++++++++++++----
.../unit/kafka/tools/ConsoleConsumerTest.scala | 32 ++++++++++++++++++
2 files changed, 60 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b63e41ea/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index a1e2ffa..664557a 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -174,7 +174,8 @@ object ConsoleConsumer extends Logging {
props.putAll(config.consumerProps)
props.putAll(config.extraConsumerProps)
- props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest")
+ if (!props.containsKey("auto.offset.reset"))
+ props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest")
props.put("zookeeper.connect", config.zkConnectionStr)
if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt) &&
@@ -197,7 +198,8 @@ object ConsoleConsumer extends Logging {
props.putAll(config.consumerProps)
props.putAll(config.extraConsumerProps)
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest")
+ if (!props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest")
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
@@ -327,12 +329,32 @@ object ConsoleConsumer extends Logging {
val isolationLevel = options.valueOf(isolationLevelOpt).toString
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
- if (keyDeserializer != null && !keyDeserializer.isEmpty) {
+ if (keyDeserializer != null && !keyDeserializer.isEmpty)
+ // the argument that is provided directly takes precedence
formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
- }
- if (valueDeserializer != null && !valueDeserializer.isEmpty) {
+ else if (extraConsumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
+ // then the argument that is provided through --consumer-property
+ formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, extraConsumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
+ else if (consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
+ // then the argument that is provided through --consumer.config
+ formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
+ else
+ // the default is used if the argument is not provided directly or indirectly
+ formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
+
+ if (valueDeserializer != null && !valueDeserializer.isEmpty)
+ // the argument that is provided directly takes precedence
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
- }
+ else if (extraConsumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
+ // then the argument that is provided through --consumer-property
+ formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, extraConsumerProps.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
+ else if (consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
+ // then the argument that is provided through --consumer.config
+ formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumerProps.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
+ else
+ // the default is used if the argument is not provided directly or indirectly
+ formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
+
formatter.init(formatterArgs)
if (useOldConsumer) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b63e41ea/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index e0917a2..3cfb5a5 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -191,4 +191,36 @@ class ConsoleConsumerTest {
assertEquals("1000", config.consumerProps.getProperty("request.timeout.ms"))
}
+
+ @Test
+ def shouldOverwriteConfigFromConfigFileOrPropertiesWithConfigFromArguments() {
+ val propsFile = TestUtils.tempFile()
+ val propsStream = new FileOutputStream(propsFile)
+ propsStream.write("bootstrap.servers=localhost:9093\n".getBytes())
+ propsStream.write("auto.offset.reset=earliest\n".getBytes())
+ propsStream.write("key.deserializer=org.apache.kafka.common.serialization.LongDeserializer\n".getBytes())
+ propsStream.write("value.deserializer=org.apache.kafka.common.serialization.LongDeserializer".getBytes())
+ propsStream.close()
+ val args: Array[String] = Array(
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--key-deserializer", "org.apache.kafka.common.serialization.DoubleDeserializer",
+ "--value-deserializer", "org.apache.kafka.common.serialization.DoubleDeserializer",
+ "--consumer-property", "auto.offset.reset=latest",
+ "--consumer-property", "key.deserializer=org.apache.kafka.common.serialization.FloatDeserializer",
+ "--consumer-property", "value.deserializer=org.apache.kafka.common.serialization.FloatDeserializer",
+ "--consumer.config", propsFile.getAbsolutePath
+ )
+
+ val config = new ConsoleConsumer.ConsumerConfig(args)
+ val props = ConsoleConsumer.getNewConsumerProps(config)
+
+ assertEquals("localhost:9092", props.getProperty("bootstrap.servers"))
+ assertEquals("latest", props.getProperty("auto.offset.reset"))
+ assertEquals("org.apache.kafka.common.serialization.DoubleDeserializer", config.formatterArgs.getProperty("key.deserializer"))
+ assertEquals("org.apache.kafka.common.serialization.DoubleDeserializer", config.formatterArgs.getProperty("value.deserializer"))
+ // serde settings applies to message formatter only, not the consumer itself
+ assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.getProperty("key.deserializer"))
+ assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.getProperty("value.deserializer"))
+ }
}