You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/05/25 00:11:25 UTC

[kafka] branch trunk updated: KAFKA-10022:console-producer supports the setting of client.id (#8698)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 45383f7  KAFKA-10022:console-producer supports the setting of client.id (#8698)
45383f7 is described below

commit 45383f75b3b343ac2695f0722b794947ee37098c
Author: 阿洋 <xi...@126.com>
AuthorDate: Mon May 25 08:10:43 2020 +0800

    KAFKA-10022:console-producer supports the setting of client.id (#8698)
    
    "console-producer" supports the setting of "client.id", which is a reasonable requirement, and the way "console consumer" and "console producer" handle "client.id" can be unified. "client.id" defaults to "console-producer"
    
    Co-authored-by: xinzhuxiansheng <xi...@autohome.com.cn>
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 core/src/main/scala/kafka/tools/ConsoleProducer.scala    |  5 +++--
 .../scala/unit/kafka/tools/ConsoleProducerTest.scala     | 16 ++++++++++++++++
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 06bef80..04c6818 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -86,13 +86,14 @@ object ConsoleProducer {
 
     props ++= config.extraProducerProps
 
-    if(config.bootstrapServer != null)
+    if (config.bootstrapServer != null)
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
     else
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
 
     props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
-    props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
+    if (props.getProperty(ProducerConfig.CLIENT_ID_CONFIG) == null)
+      props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
 
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
index ef7b6c6..a636c32 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
@@ -59,6 +59,14 @@ class ConsoleProducerTest {
     "--topic",
     "t3",
   )
+  val clientIdOverride: Array[String] = Array(
+    "--broker-list",
+    "localhost:1001",
+    "--topic",
+    "t3",
+    "--producer-property",
+    "client.id=producer-1"
+  )
 
   @Test
   def testValidConfigsBrokerList(): Unit = {
@@ -102,4 +110,12 @@ class ConsoleProducerTest {
     assertEquals(util.Arrays.asList("localhost:1002"),
       producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
   }
+
+  @Test
+  def testClientIdOverride(): Unit = {
+    val config = new ConsoleProducer.ProducerConfig(clientIdOverride)
+    val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config))
+    assertEquals("producer-1",
+      producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG))
+  }
 }