You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/25 21:31:00 UTC

[jira] [Commented] (KAFKA-3567) Add --security-protocol option to console consumer and producer

    [ https://issues.apache.org/jira/browse/KAFKA-3567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376255#comment-16376255 ] 

ASF GitHub Bot commented on KAFKA-3567:
---------------------------------------

hachikuji closed pull request #1409: KAFKA-3567 Add --security-protocol option to console consumer and producer
URL: https://github.com/apache/kafka/pull/1409
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 89536400e1c..f4c391b4b3e 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -28,6 +28,7 @@ import kafka.message._
 import kafka.metrics.KafkaMetricsReporter
 import kafka.utils._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
+import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.Deserializer
@@ -189,6 +190,8 @@ object ConsoleConsumer extends Logging {
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, if (config.keyDeserializer != null) config.keyDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer")
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, if (config.valueDeserializer != null) config.valueDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer")
+    if(props.getProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)==null)
+      props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.securityProtocol)
 
     props
   }
@@ -260,6 +263,11 @@ object ConsoleConsumer extends Logging {
     val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
                                                        "Log lifecycle events of the consumer in addition to logging consumed " +
                                                        "messages. (This is specific for system tests.)")
+    val securityProtocolOpt = parser.accepts("security-protocol", "Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL")
+      .withRequiredArg
+      .describedAs("security-protocol")
+      .ofType(classOf[String])
+      .defaultsTo("PLAINTEXT")
 
     if (args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.")
@@ -303,6 +311,7 @@ object ConsoleConsumer extends Logging {
     val valueDeserializer = options.valueOf(valueDeserializerOpt)
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
     formatter.init(formatterArgs)
+    val securityProtocol = options.valueOf(securityProtocolOpt)
 
     CommandLineUtils.checkRequiredArgs(parser, options, if (useNewConsumer) bootstrapServerOpt else zkConnectOpt)
 
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index e6476015f19..10f6e260433 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -28,6 +28,7 @@ import java.io._
 import joptsimple._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.clients.CommonClientConfigs
 
 object ConsoleProducer {
 
@@ -125,6 +126,8 @@ object ConsoleProducer {
     props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    if(props.getProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)==null)
+      props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.securityProtocol)
 
     props
   }
@@ -245,7 +248,11 @@ object ConsoleProducer {
       .describedAs("config file")
       .ofType(classOf[String])
     val useOldProducerOpt = parser.accepts("old-producer", "Use the old producer implementation.")
-
+    val securityProtocolOpt = parser.accepts("security-protocol", "Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL")
+      .withRequiredArg
+      .describedAs("security-protocol")
+      .ofType(classOf[String])
+      .defaultsTo("PLAINTEXT")
     val options = parser.parse(args : _*)
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.")
@@ -282,6 +289,7 @@ object ConsoleProducer {
     val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)
     val metadataExpiryMs = options.valueOf(metadataExpiryMsOpt)
     val maxBlockMs = options.valueOf(maxBlockMsOpt)
+    val securityProtocol = options.valueOf(securityProtocolOpt)
   }
 
   class LineMessageReader extends MessageReader {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add --security-protocol option to console consumer and producer
> ---------------------------------------------------------------
>
>                 Key: KAFKA-3567
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3567
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.9.0.0
>            Reporter: Sriharsha Chintalapani
>            Assignee: Bharat Viswanadham
>            Priority: Major
>              Labels: needs-kip
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)