You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/18 15:16:00 UTC

[jira] [Commented] (KAFKA-5529) ConsoleProducer uses deprecated BaseProducer

    [ https://issues.apache.org/jira/browse/KAFKA-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655393#comment-16655393 ] 

ASF GitHub Bot commented on KAFKA-5529:
---------------------------------------

omkreddy closed pull request #3450: KAFKA-5529 Use only KafkaProducer in ConsoleProducer
URL: https://github.com/apache/kafka/pull/3450
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 1b221407b57..0ecebbf5735 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -21,13 +21,13 @@ import kafka.common._
 import kafka.message._
 import kafka.serializer._
 import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
-import kafka.producer.{NewShinyProducer, OldProducer}
 import java.util.Properties
 import java.io._
 import java.nio.charset.StandardCharsets
 
 import joptsimple._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
@@ -41,12 +41,7 @@ object ConsoleProducer {
         val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
         reader.init(System.in, getReaderProps(config))
 
-        val producer =
-          if(config.useOldProducer) {
-            new OldProducer(getOldProducerProps(config))
-          } else {
-            new NewShinyProducer(getNewProducerProps(config))
-          }
+        val producer = new KafkaProducer[Array[Byte], Array[Byte]](getNewProducerProps(config))
 
         Runtime.getRuntime.addShutdownHook(new Thread() {
           override def run() {
@@ -58,7 +53,7 @@ object ConsoleProducer {
         do {
           message = reader.readMessage()
           if (message != null)
-            producer.send(message.topic, message.key, message.value)
+            producer.send(message, new ErrorLoggingCallback(message.topic, message.key, message.value, false))
         } while (message != null)
     } catch {
       case e: joptsimple.OptionException =>
@@ -78,29 +73,6 @@ object ConsoleProducer {
     props
   }
 
-  def getOldProducerProps(config: ProducerConfig): Properties = {
-    val props = producerProps(config)
-
-    props.put("metadata.broker.list", config.brokerList)
-    props.put("compression.codec", config.compressionCodec)
-    props.put("producer.type", if(config.sync) "sync" else "async")
-    props.put("batch.num.messages", config.batchSize.toString)
-    props.put("message.send.max.retries", config.messageSendMaxRetries.toString)
-    props.put("retry.backoff.ms", config.retryBackoffMs.toString)
-    props.put("queue.buffering.max.ms", config.sendTimeout.toString)
-    props.put("queue.buffering.max.messages", config.queueSize.toString)
-    props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString)
-    props.put("request.required.acks", config.requestRequiredAcks)
-    props.put("request.timeout.ms", config.requestTimeoutMs.toString)
-    props.put("key.serializer.class", config.keyEncoderClass)
-    props.put("serializer.class", config.valueEncoderClass)
-    props.put("send.buffer.bytes", config.socketBuffer.toString)
-    props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString)
-    props.put("client.id", "console-producer")
-
-    props
-  }
-
   private def producerProps(config: ProducerConfig): Properties = {
     val props =
       if (config.options.has(config.producerConfigOpt))
@@ -247,14 +219,12 @@ object ConsoleProducer {
       .withRequiredArg
       .describedAs("config file")
       .ofType(classOf[String])
-    val useOldProducerOpt = parser.accepts("old-producer", "Use the old producer implementation.")
 
     val options = parser.parse(args : _*)
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.")
     CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt)
 
-    val useOldProducer = options.has(useOldProducerOpt)
     val topic = options.valueOf(topicOpt)
     val brokerList = options.valueOf(brokerListOpt)
     ToolsUtils.validatePortOrDie(parser,brokerList)
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
index da80c0d41e2..93be28778ba 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.tools
 
-import kafka.producer.ProducerConfig
 import ConsoleProducer.LineMessageReader
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.junit.{Assert, Test}
@@ -49,13 +48,6 @@ class ConsoleProducerTest {
     producer.close()
   }
 
-  @Test
-  @deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")
-  def testValidConfigsOldProducer() {
-    val config = new ConsoleProducer.ProducerConfig(validArgs)
-    new ProducerConfig(ConsoleProducer.getOldProducerProps(config))
-  }
-
   @Test
   def testInvalidConfigs() {
     try {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> ConsoleProducer uses deprecated BaseProducer
> --------------------------------------------
>
>                 Key: KAFKA-5529
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5529
>             Project: Kafka
>          Issue Type: Improvement
>          Components: tools
>            Reporter: Evgeny Veretennikov
>            Assignee: Evgeny Veretennikov
>            Priority: Minor
>             Fix For: 2.0.0
>
>
> BaseProducer is deprecated, should use org.apache.kafka.clients.producer.KafkaProducer instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)