You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/02/09 09:56:34 UTC

[pinot] branch master updated: Add SASL_SSL support for stream github event command (#10253)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 63c1e504db Add SASL_SSL support for stream github event command (#10253)
63c1e504db is described below

commit 63c1e504db3b241314899a12d251b39d2604bdfe
Author: Seunghyun Lee <se...@startree.ai>
AuthorDate: Thu Feb 9 01:56:24 2023 -0800

    Add SASL_SSL support for stream github event command (#10253)
    
    - Adding the sasl_ssl support for stream github event command
    - This allows to configure to publish events to the kafka topics
      with sasl_ssl/plaintext.
---
 .../admin/command/StreamGitHubEventsCommand.java   | 35 +++++++++++++++-------
 .../PullRequestMergedEventsStream.java             | 18 +++++++++--
 2 files changed, 40 insertions(+), 13 deletions(-)

diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java
index 579dd63089..7b0fea34d4 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java
@@ -38,16 +38,27 @@ public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand implemen
   @CommandLine.Option(names = {"-personalAccessToken"}, required = true, description = "GitHub personal access token.")
   private String _personalAccessToken;
 
-  @CommandLine.Option(names = {"-sourceType"}, defaultValue = "Kafka",
-      description = "Stream DataSource to use for ingesting data. Supported values - Kafka,Kinesis")
+  @CommandLine.Option(names = {"-sourceType"}, defaultValue = "Kafka", description = "Stream DataSource to use for "
+      + "ingesting data. Supported values - Kafka,Kinesis")
   private String _sourceType;
 
-  @CommandLine.Option(names = {"-kafkaBrokerList"},
-      description = "Kafka broker list of the kafka cluster to produce events.")
+  @CommandLine.Option(names = {"-kafkaBrokerList"}, description = "Kafka broker list of the kafka cluster to produce "
+      + "events.")
   private String _kafkaBrokerList = KafkaStarterUtils.DEFAULT_KAFKA_BROKER;
 
-  @CommandLine.Option(names = {"-kinesisEndpoint"},
-      description = "Endpoint of localstack or any other Kinesis cluster when not using AWS.")
+  @CommandLine.Option(names = {"-kafkaSecurityProtocol"}, description = "Kafka security protocol "
+      + "(PLAINTEXT/SASL_SSL/etc")
+  private String _kafkaSecurityProtocol;
+
+  // Only needed when configured as SASL_SSL
+  @CommandLine.Option(names = {"-kafkaUserName"}, description = "Kafka Username")
+  private String _kafkaSaslUserName;
+
+  @CommandLine.Option(names = {"-kafkaPassword"}, description = "Kafka Password")
+  private String _kafkaSaslPassword;
+
+  @CommandLine.Option(names = {"-kinesisEndpoint"}, description = "Endpoint of localstack or any other Kinesis "
+      + "cluster when not using AWS.")
   private String _kinesisEndpoint = null;
 
   @CommandLine.Option(names = {"-awsRegion"}, description = "AWS Region in which Kinesis is located")
@@ -59,12 +70,12 @@ public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand implemen
   @CommandLine.Option(names = {"-awsSecretKey"}, description = "SecretKey for AWS Account")
   private String _secretKey;
 
-  @CommandLine.Option(names = {"-topic"}, required = true,
-      description = "Name of kafka-topic/kinesis-stream to publish events.")
+  @CommandLine.Option(names = {"-topic"}, required = true, description = "Name of kafka-topic/kinesis-stream to "
+      + "publish events.")
   private String _topic;
 
-  @CommandLine.Option(names = {"-eventType"},
-      description = "Type of GitHub event. Supported types - pullRequestMergedEvent")
+  @CommandLine.Option(names = {"-eventType"}, description = "Type of GitHub event. Supported types - "
+      + "pullRequestMergedEvent")
   private String _eventType = PULL_REQUEST_MERGED_EVENT_TYPE;
 
   @CommandLine.Option(names = {"-schemaFile"}, description = "Path to schema file. "
@@ -133,7 +144,9 @@ public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand implemen
           break;
         case KAFKA:
         default:
-          streamDataProducer = PullRequestMergedEventsStream.getKafkaStreamDataProducer(_kafkaBrokerList);
+          streamDataProducer =
+              PullRequestMergedEventsStream.getKafkaStreamDataProducer(_kafkaBrokerList, _kafkaSecurityProtocol,
+                  _kafkaSaslUserName, _kafkaSaslPassword);
           break;
       }
       PullRequestMergedEventsStream pullRequestMergedEventsStream =
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
index 950af0d260..34cc6222a6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
@@ -82,15 +82,29 @@ public class PullRequestMergedEventsStream {
 
   public static StreamDataProducer getKafkaStreamDataProducer()
       throws Exception {
-    return getKafkaStreamDataProducer(KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+    return getKafkaStreamDataProducer(KafkaStarterUtils.DEFAULT_KAFKA_BROKER, null, null, null);
   }
 
-  public static StreamDataProducer getKafkaStreamDataProducer(String kafkaBrokerList)
+  public static StreamDataProducer getKafkaStreamDataProducer(String kafkaBrokerList, String kafkaSecurityProtocol,
+      String kafkaSaslUserName, String kafkaSaslPassword)
       throws Exception {
     Properties properties = new Properties();
     properties.put("metadata.broker.list", kafkaBrokerList);
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
+
+    if (StringUtils.isNotEmpty(kafkaSecurityProtocol)) {
+      properties.put("security.protocol", kafkaSecurityProtocol);
+      // If the protocol is 'SASL_SSL', fill the sasl related configs
+      if (kafkaSecurityProtocol.equals("SASL_SSL") && StringUtils.isNotEmpty(kafkaSaslUserName)
+          && StringUtils.isNotEmpty(kafkaSaslPassword)) {
+        properties.put("sasl.mechanism", "PLAIN");
+        String jaasConfig = String.format(
+            "org.apache.kafka.common.security.plain.PlainLoginModule required \n username=\"%s\" \n password=\"%s\";",
+            kafkaSaslUserName, kafkaSaslPassword);
+        properties.put("sasl.jaas.config", jaasConfig);
+      }
+    }
     return StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org