You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/05/25 00:11:25 UTC
[kafka] branch trunk updated: KAFKA-10022:console-producer supports
the setting of client.id (#8698)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 45383f7 KAFKA-10022:console-producer supports the setting of client.id (#8698)
45383f7 is described below
commit 45383f75b3b343ac2695f0722b794947ee37098c
Author: 阿洋 <xi...@126.com>
AuthorDate: Mon May 25 08:10:43 2020 +0800
KAFKA-10022:console-producer supports the setting of client.id (#8698)
"console-producer" supports the setting of "client.id", which is a reasonable requirement, and the way "console consumer" and "console producer" handle "client.id" can be unified. "client.id" defaults to "console-producer"
Co-authored-by: xinzhuxiansheng <xi...@autohome.com.cn>
Reviewers: Guozhang Wang <wa...@gmail.com>
---
core/src/main/scala/kafka/tools/ConsoleProducer.scala | 5 +++--
.../scala/unit/kafka/tools/ConsoleProducerTest.scala | 16 ++++++++++++++++
2 files changed, 19 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 06bef80..04c6818 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -86,13 +86,14 @@ object ConsoleProducer {
props ++= config.extraProducerProps
- if(config.bootstrapServer != null)
+ if (config.bootstrapServer != null)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
else
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
+ if (props.getProperty(ProducerConfig.CLIENT_ID_CONFIG) == null)
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
index ef7b6c6..a636c32 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
@@ -59,6 +59,14 @@ class ConsoleProducerTest {
"--topic",
"t3",
)
+ val clientIdOverride: Array[String] = Array(
+ "--broker-list",
+ "localhost:1001",
+ "--topic",
+ "t3",
+ "--producer-property",
+ "client.id=producer-1"
+ )
@Test
def testValidConfigsBrokerList(): Unit = {
@@ -102,4 +110,12 @@ class ConsoleProducerTest {
assertEquals(util.Arrays.asList("localhost:1002"),
producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
}
+
+ @Test
+ def testClientIdOverride(): Unit = {
+ val config = new ConsoleProducer.ProducerConfig(clientIdOverride)
+ val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config))
+ assertEquals("producer-1",
+ producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG))
+ }
}