You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/02/09 09:56:34 UTC
[pinot] branch master updated: Add SASL_SSL support for stream github event command (#10253)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 63c1e504db Add SASL_SSL support for stream github event command (#10253)
63c1e504db is described below
commit 63c1e504db3b241314899a12d251b39d2604bdfe
Author: Seunghyun Lee <se...@startree.ai>
AuthorDate: Thu Feb 9 01:56:24 2023 -0800
Add SASL_SSL support for stream github event command (#10253)
- Adding the sasl_ssl support for stream github event command
- This allows to configure to publish events to the kafka topics
with sasl_ssl/plaintext.
---
.../admin/command/StreamGitHubEventsCommand.java | 35 +++++++++++++++-------
.../PullRequestMergedEventsStream.java | 18 +++++++++--
2 files changed, 40 insertions(+), 13 deletions(-)
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java
index 579dd63089..7b0fea34d4 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java
@@ -38,16 +38,27 @@ public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand implemen
@CommandLine.Option(names = {"-personalAccessToken"}, required = true, description = "GitHub personal access token.")
private String _personalAccessToken;
- @CommandLine.Option(names = {"-sourceType"}, defaultValue = "Kafka",
- description = "Stream DataSource to use for ingesting data. Supported values - Kafka,Kinesis")
+ @CommandLine.Option(names = {"-sourceType"}, defaultValue = "Kafka", description = "Stream DataSource to use for "
+ + "ingesting data. Supported values - Kafka,Kinesis")
private String _sourceType;
- @CommandLine.Option(names = {"-kafkaBrokerList"},
- description = "Kafka broker list of the kafka cluster to produce events.")
+ @CommandLine.Option(names = {"-kafkaBrokerList"}, description = "Kafka broker list of the kafka cluster to produce "
+ + "events.")
private String _kafkaBrokerList = KafkaStarterUtils.DEFAULT_KAFKA_BROKER;
- @CommandLine.Option(names = {"-kinesisEndpoint"},
- description = "Endpoint of localstack or any other Kinesis cluster when not using AWS.")
+ @CommandLine.Option(names = {"-kafkaSecurityProtocol"}, description = "Kafka security protocol "
+ + "(PLAINTEXT/SASL_SSL/etc")
+ private String _kafkaSecurityProtocol;
+
+ // Only needed when configured as SASL_SSL
+ @CommandLine.Option(names = {"-kafkaUserName"}, description = "Kafka Username")
+ private String _kafkaSaslUserName;
+
+ @CommandLine.Option(names = {"-kafkaPassword"}, description = "Kafka Password")
+ private String _kafkaSaslPassword;
+
+ @CommandLine.Option(names = {"-kinesisEndpoint"}, description = "Endpoint of localstack or any other Kinesis "
+ + "cluster when not using AWS.")
private String _kinesisEndpoint = null;
@CommandLine.Option(names = {"-awsRegion"}, description = "AWS Region in which Kinesis is located")
@@ -59,12 +70,12 @@ public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand implemen
@CommandLine.Option(names = {"-awsSecretKey"}, description = "SecretKey for AWS Account")
private String _secretKey;
- @CommandLine.Option(names = {"-topic"}, required = true,
- description = "Name of kafka-topic/kinesis-stream to publish events.")
+ @CommandLine.Option(names = {"-topic"}, required = true, description = "Name of kafka-topic/kinesis-stream to "
+ + "publish events.")
private String _topic;
- @CommandLine.Option(names = {"-eventType"},
- description = "Type of GitHub event. Supported types - pullRequestMergedEvent")
+ @CommandLine.Option(names = {"-eventType"}, description = "Type of GitHub event. Supported types - "
+ + "pullRequestMergedEvent")
private String _eventType = PULL_REQUEST_MERGED_EVENT_TYPE;
@CommandLine.Option(names = {"-schemaFile"}, description = "Path to schema file. "
@@ -133,7 +144,9 @@ public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand implemen
break;
case KAFKA:
default:
- streamDataProducer = PullRequestMergedEventsStream.getKafkaStreamDataProducer(_kafkaBrokerList);
+ streamDataProducer =
+ PullRequestMergedEventsStream.getKafkaStreamDataProducer(_kafkaBrokerList, _kafkaSecurityProtocol,
+ _kafkaSaslUserName, _kafkaSaslPassword);
break;
}
PullRequestMergedEventsStream pullRequestMergedEventsStream =
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
index 950af0d260..34cc6222a6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
@@ -82,15 +82,29 @@ public class PullRequestMergedEventsStream {
public static StreamDataProducer getKafkaStreamDataProducer()
throws Exception {
- return getKafkaStreamDataProducer(KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+ return getKafkaStreamDataProducer(KafkaStarterUtils.DEFAULT_KAFKA_BROKER, null, null, null);
}
- public static StreamDataProducer getKafkaStreamDataProducer(String kafkaBrokerList)
+ public static StreamDataProducer getKafkaStreamDataProducer(String kafkaBrokerList, String kafkaSecurityProtocol,
+ String kafkaSaslUserName, String kafkaSaslPassword)
throws Exception {
Properties properties = new Properties();
properties.put("metadata.broker.list", kafkaBrokerList);
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
+
+ if (StringUtils.isNotEmpty(kafkaSecurityProtocol)) {
+ properties.put("security.protocol", kafkaSecurityProtocol);
+ // If the protocol is 'SASL_SSL', fill the sasl related configs
+ if (kafkaSecurityProtocol.equals("SASL_SSL") && StringUtils.isNotEmpty(kafkaSaslUserName)
+ && StringUtils.isNotEmpty(kafkaSaslPassword)) {
+ properties.put("sasl.mechanism", "PLAIN");
+ String jaasConfig = String.format(
+ "org.apache.kafka.common.security.plain.PlainLoginModule required \n username=\"%s\" \n password=\"%s\";",
+ kafkaSaslUserName, kafkaSaslPassword);
+ properties.put("sasl.jaas.config", jaasConfig);
+ }
+ }
return StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org