You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2021/07/27 06:25:30 UTC
[pulsar] branch master updated: Allow to config Sasl configs in
Kafka sink. (#11422)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3afbef2 Allow to config Sasl configs in Kafka sink. (#11422)
3afbef2 is described below
commit 3afbef218491dd2b4d4005e4819de2751c9eb9b9
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Tue Jul 27 14:24:35 2021 +0800
Allow to config Sasl configs in Kafka sink. (#11422)
### Motivation
Allow pulsar io to push messages to sasl kafka cluster.
### Modifications
Add several kafka sasl configs, make them configable.
### Documentation
I think the pulsar-io can automatically generate the doc for added fields. So we don't need add any docs.
---
.../apache/pulsar/io/kafka/KafkaAbstractSink.java | 26 ++++-
.../apache/pulsar/io/kafka/KafkaSinkConfig.java | 105 +++++++++++++++------
.../io/kafka/sink/KafkaAbstractSinkTest.java | 19 ++++
.../src/test/resources/kafkaSinkConfigSasl.yaml | 29 ++++++
4 files changed, 147 insertions(+), 32 deletions(-)
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index 30c6698..7bfbc21 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -25,11 +25,14 @@ import java.util.Objects;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
@@ -94,6 +97,27 @@ public abstract class KafkaAbstractSink<K, V> implements Sink<byte[]> {
}
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSinkConfig.getBootstrapServers());
+ if (StringUtils.isNotEmpty(kafkaSinkConfig.getSecurityProtocol())) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSinkConfig.getSecurityProtocol());
+ }
+ if (StringUtils.isNotEmpty(kafkaSinkConfig.getSaslMechanism())) {
+ props.put(SaslConfigs.SASL_MECHANISM, kafkaSinkConfig.getSaslMechanism());
+ }
+ if (StringUtils.isNotEmpty(kafkaSinkConfig.getSaslJaasConfig())) {
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaSinkConfig.getSaslJaasConfig());
+ }
+ if (StringUtils.isNotEmpty(kafkaSinkConfig.getSslEnabledProtocols())) {
+ props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, kafkaSinkConfig.getSslEnabledProtocols());
+ }
+ if (StringUtils.isNotEmpty(kafkaSinkConfig.getSslEndpointIdentificationAlgorithm())) {
+ props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, kafkaSinkConfig.getSslEndpointIdentificationAlgorithm());
+ }
+ if (StringUtils.isNotEmpty(kafkaSinkConfig.getSslTruststoreLocation())) {
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaSinkConfig.getSslTruststoreLocation());
+ }
+ if (StringUtils.isNotEmpty(kafkaSinkConfig.getSslTruststorePassword())) {
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaSinkConfig.getSslTruststorePassword());
+ }
props.put(ProducerConfig.ACKS_CONFIG, kafkaSinkConfig.getAcks());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(kafkaSinkConfig.getBatchSize()));
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(kafkaSinkConfig.getMaxRequestSize()));
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
index 48c6488..e0cdff8 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
@@ -37,51 +37,94 @@ public class KafkaSinkConfig implements Serializable {
private static final long serialVersionUID = 1L;
@FieldDoc(
- required = true,
- defaultValue = "",
- help =
- "A comma-separated list of host and port pairs that are the addresses of "
- + "the Kafka brokers that a Kafka client connects to initially bootstrap itself")
+ required = true,
+ defaultValue = "",
+ help =
+ "A comma-separated list of host and port pairs that are the addresses of "
+ + "the Kafka brokers that a Kafka client connects to initially bootstrap itself")
private String bootstrapServers;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "Protocol used to communicate with kafka brokers.")
+ private String securityProtocol;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "SASL mechanism used for kafka client connections.")
+ private String saslMechanism;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "JAAS login context parameters for SASL connections in the format used by JAAS configuration files.")
+ private String saslJaasConfig;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "The list of protocols enabled for SSL connections.")
+ private String sslEnabledProtocols;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "The endpoint identification algorithm to validate server hostname using server certificate.")
+ private String sslEndpointIdentificationAlgorithm;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "The location of the trust store file.")
+ private String sslTruststoreLocation;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "The password for the trust store file.")
+ private String sslTruststorePassword;
+
@FieldDoc(
- required = true,
- defaultValue = "",
- help =
- "The number of acknowledgments the producer requires the leader to have received "
- + "before considering a request complete. This controls the durability of records that are sent.")
+ required = true,
+ defaultValue = "",
+ help =
+ "The number of acknowledgments the producer requires the leader to have received "
+ + "before considering a request complete. This controls the durability of records that are sent.")
private String acks;
@FieldDoc(
- defaultValue = "16384L",
- help =
- "The batch size that Kafka producer will attempt to batch records together before sending them to brokers.")
+ defaultValue = "16384L",
+ help =
+ "The batch size that Kafka producer will attempt to batch records together before sending them to brokers.")
private long batchSize = 16384L;
@FieldDoc(
- defaultValue = "1048576L",
- help =
- "The maximum size of a Kafka request in bytes.")
+ defaultValue = "1048576L",
+ help =
+ "The maximum size of a Kafka request in bytes.")
private long maxRequestSize = 1048576L;
@FieldDoc(
- required = true,
- defaultValue = "",
- help =
- "The Kafka topic that is used for Pulsar moving messages to.")
+ required = true,
+ defaultValue = "",
+ help =
+ "The Kafka topic that is used for Pulsar moving messages to.")
private String topic;
@FieldDoc(
- defaultValue = "org.apache.kafka.common.serialization.StringSerializer",
- help =
- "The serializer class for Kafka producer to serialize keys.")
+ defaultValue = "org.apache.kafka.common.serialization.StringSerializer",
+ help =
+ "The serializer class for Kafka producer to serialize keys.")
private String keySerializerClass = "org.apache.kafka.common.serialization.StringSerializer";
@FieldDoc(
- defaultValue = "org.apache.kafka.common.serialization.ByteArraySerializer",
- help =
- "The serializer class for Kafka producer to serialize values. You typically shouldn't care this. "
- + "Since the serializer will be set by a specific implementation of `KafkaAbstractSink`.")
+ defaultValue = "org.apache.kafka.common.serialization.ByteArraySerializer",
+ help =
+ "The serializer class for Kafka producer to serialize values. You typically shouldn't care this. "
+ + "Since the serializer will be set by a specific implementation of `KafkaAbstractSink`.")
private String valueSerializerClass = "org.apache.kafka.common.serialization.ByteArraySerializer";
@FieldDoc(
- defaultValue = "",
- help =
- "The producer config properties to be passed to Producer. Note that other properties specified "
- + "in the connector config file take precedence over this config.")
+ defaultValue = "",
+ help =
+ "The producer config properties to be passed to Producer. Note that other properties specified "
+ + "in the connector config file take precedence over this config.")
private Map<String, Object> producerConfigProperties;
public static KafkaSinkConfig load(String yamlFile) throws IOException {
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index f5fe46b..c04f654 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.kafka.sink;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
@@ -221,6 +222,24 @@ public class KafkaAbstractSinkTest {
assertEquals("1", props.getProperty(ProducerConfig.ACKS_CONFIG));
}
+ @Test
+ public final void loadFromSaslYamlFileTest() throws IOException {
+ File yamlFile = getFile("kafkaSinkConfigSasl.yaml");
+ KafkaSinkConfig config = KafkaSinkConfig.load(yamlFile.getAbsolutePath());
+ assertNotNull(config);
+ assertEquals(config.getBootstrapServers(), "localhost:6667");
+ assertEquals(config.getTopic(), "test");
+ assertEquals(config.getAcks(), "1");
+ assertEquals(config.getBatchSize(), 16384L);
+ assertEquals(config.getMaxRequestSize(), 1048576L);
+ assertEquals(config.getSecurityProtocol(), SecurityProtocol.SASL_PLAINTEXT.name);
+ assertEquals(config.getSaslMechanism(), "PLAIN");
+ assertEquals(config.getSaslJaasConfig(), "org.apache.kafka.common.security.plain.PlainLoginModule required \nusername=\"alice\" \npassword=\"pwd\";");
+ assertEquals(config.getSslEndpointIdentificationAlgorithm(), "");
+ assertEquals(config.getSslTruststoreLocation(), "/etc/cert.pem");
+ assertEquals(config.getSslTruststorePassword(), "cert_pwd");
+ }
+
private File getFile(String name) {
ClassLoader classLoader = getClass().getClassLoader();
return new File(classLoader.getResource(name).getFile());
diff --git a/pulsar-io/kafka/src/test/resources/kafkaSinkConfigSasl.yaml b/pulsar-io/kafka/src/test/resources/kafkaSinkConfigSasl.yaml
new file mode 100644
index 0000000..384d2fd
--- /dev/null
+++ b/pulsar-io/kafka/src/test/resources/kafkaSinkConfigSasl.yaml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"bootstrapServers": "localhost:6667"
+"topic": "test"
+"acks": "1"
+"securityProtocol": "SASL_PLAINTEXT"
+"saslMechanism" : "PLAIN"
+"saslJaasConfig" : "org.apache.kafka.common.security.plain.PlainLoginModule required \nusername=\"alice\" \npassword=\"pwd\";"
+"sslEnabledProtocols" : "TLSv1.2"
+"sslEndpointIdentificationAlgorithm" : ""
+"sslTruststoreLocation" : "/etc/cert.pem"
+"sslTruststorePassword" : "cert_pwd"
\ No newline at end of file