You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2021/07/27 06:25:30 UTC

[pulsar] branch master updated: Allow to config Sasl configs in Kafka sink. (#11422)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3afbef2  Allow to config Sasl configs in Kafka sink. (#11422)
3afbef2 is described below

commit 3afbef218491dd2b4d4005e4819de2751c9eb9b9
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Tue Jul 27 14:24:35 2021 +0800

    Allow to config Sasl configs in Kafka sink. (#11422)
    
    ### Motivation
    Allow pulsar io to push messages to sasl kafka cluster.
    
    ### Modifications
    
    Add several kafka sasl configs, make them configable.
    
    
    ### Documentation
    I think the pulsar-io can automatically generate the doc for added fields. So we don't need add any docs.
---
 .../apache/pulsar/io/kafka/KafkaAbstractSink.java  |  26 ++++-
 .../apache/pulsar/io/kafka/KafkaSinkConfig.java    | 105 +++++++++++++++------
 .../io/kafka/sink/KafkaAbstractSinkTest.java       |  19 ++++
 .../src/test/resources/kafkaSinkConfigSasl.yaml    |  29 ++++++
 4 files changed, 147 insertions(+), 32 deletions(-)

diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index 30c6698..7bfbc21 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -25,11 +25,14 @@ import java.util.Objects;
 import java.util.Properties;
 
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
@@ -94,6 +97,27 @@ public abstract class KafkaAbstractSink<K, V> implements Sink<byte[]> {
         }
 
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSinkConfig.getBootstrapServers());
+        if (StringUtils.isNotEmpty(kafkaSinkConfig.getSecurityProtocol())) {
+            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSinkConfig.getSecurityProtocol());
+        }
+        if (StringUtils.isNotEmpty(kafkaSinkConfig.getSaslMechanism())) {
+            props.put(SaslConfigs.SASL_MECHANISM, kafkaSinkConfig.getSaslMechanism());
+        }
+        if (StringUtils.isNotEmpty(kafkaSinkConfig.getSaslJaasConfig())) {
+            props.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaSinkConfig.getSaslJaasConfig());
+        }
+        if (StringUtils.isNotEmpty(kafkaSinkConfig.getSslEnabledProtocols())) {
+            props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, kafkaSinkConfig.getSslEnabledProtocols());
+        }
+        if (StringUtils.isNotEmpty(kafkaSinkConfig.getSslEndpointIdentificationAlgorithm())) {
+            props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, kafkaSinkConfig.getSslEndpointIdentificationAlgorithm());
+        }
+        if (StringUtils.isNotEmpty(kafkaSinkConfig.getSslTruststoreLocation())) {
+            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaSinkConfig.getSslTruststoreLocation());
+        }
+        if (StringUtils.isNotEmpty(kafkaSinkConfig.getSslTruststorePassword())) {
+            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaSinkConfig.getSslTruststorePassword());
+        }
         props.put(ProducerConfig.ACKS_CONFIG, kafkaSinkConfig.getAcks());
         props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(kafkaSinkConfig.getBatchSize()));
         props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(kafkaSinkConfig.getMaxRequestSize()));
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
index 48c6488..e0cdff8 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
@@ -37,51 +37,94 @@ public class KafkaSinkConfig implements Serializable {
     private static final long serialVersionUID = 1L;
 
     @FieldDoc(
-        required = true,
-        defaultValue = "",
-        help =
-            "A comma-separated list of host and port pairs that are the addresses of "
-          + "the Kafka brokers that a Kafka client connects to initially bootstrap itself")
+            required = true,
+            defaultValue = "",
+            help =
+                    "A comma-separated list of host and port pairs that are the addresses of "
+                            + "the Kafka brokers that a Kafka client connects to initially bootstrap itself")
     private String bootstrapServers;
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "Protocol used to communicate with kafka brokers.")
+    private String securityProtocol;
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "SASL mechanism used for kafka client connections.")
+    private String saslMechanism;
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "JAAS login context parameters for SASL connections in the format used by JAAS configuration files.")
+    private String saslJaasConfig;
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "The list of protocols enabled for SSL connections.")
+    private String sslEnabledProtocols;
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "The endpoint identification algorithm to validate server hostname using server certificate.")
+    private String sslEndpointIdentificationAlgorithm;
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "The location of the trust store file.")
+    private String sslTruststoreLocation;
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "The password for the trust store file.")
+    private String sslTruststorePassword;
+
     @FieldDoc(
-        required = true,
-        defaultValue = "",
-        help =
-            "The number of acknowledgments the producer requires the leader to have received "
-          + "before considering a request complete. This controls the durability of records that are sent.")
+            required = true,
+            defaultValue = "",
+            help =
+                    "The number of acknowledgments the producer requires the leader to have received "
+                            + "before considering a request complete. This controls the durability of records that are sent.")
     private String acks;
     @FieldDoc(
-        defaultValue = "16384L",
-        help =
-            "The batch size that Kafka producer will attempt to batch records together before sending them to brokers.")
+            defaultValue = "16384L",
+            help =
+                    "The batch size that Kafka producer will attempt to batch records together before sending them to brokers.")
     private long batchSize = 16384L;
     @FieldDoc(
-        defaultValue = "1048576L",
-        help =
-            "The maximum size of a Kafka request in bytes.")
+            defaultValue = "1048576L",
+            help =
+                    "The maximum size of a Kafka request in bytes.")
     private long maxRequestSize = 1048576L;
     @FieldDoc(
-        required = true,
-        defaultValue = "",
-        help =
-            "The Kafka topic that is used for Pulsar moving messages to.")
+            required = true,
+            defaultValue = "",
+            help =
+                    "The Kafka topic that is used for Pulsar moving messages to.")
     private String topic;
     @FieldDoc(
-        defaultValue = "org.apache.kafka.common.serialization.StringSerializer",
-        help =
-            "The serializer class for Kafka producer to serialize keys.")
+            defaultValue = "org.apache.kafka.common.serialization.StringSerializer",
+            help =
+                    "The serializer class for Kafka producer to serialize keys.")
     private String keySerializerClass = "org.apache.kafka.common.serialization.StringSerializer";
     @FieldDoc(
-        defaultValue = "org.apache.kafka.common.serialization.ByteArraySerializer",
-        help =
-            "The serializer class for Kafka producer to serialize values. You typically shouldn't care this. "
-          + "Since the serializer will be set by a specific implementation of `KafkaAbstractSink`.")
+            defaultValue = "org.apache.kafka.common.serialization.ByteArraySerializer",
+            help =
+                    "The serializer class for Kafka producer to serialize values. You typically shouldn't care this. "
+                            + "Since the serializer will be set by a specific implementation of `KafkaAbstractSink`.")
     private String valueSerializerClass = "org.apache.kafka.common.serialization.ByteArraySerializer";
     @FieldDoc(
-        defaultValue = "",
-        help =
-            "The producer config properties to be passed to Producer. Note that other properties specified "
-          + "in the connector config file take precedence over this config.")
+            defaultValue = "",
+            help =
+                    "The producer config properties to be passed to Producer. Note that other properties specified "
+                            + "in the connector config file take precedence over this config.")
     private Map<String, Object> producerConfigProperties;
 
     public static KafkaSinkConfig load(String yamlFile) throws IOException {
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index f5fe46b..c04f654 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.kafka.sink;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
@@ -221,6 +222,24 @@ public class KafkaAbstractSinkTest {
         assertEquals("1", props.getProperty(ProducerConfig.ACKS_CONFIG));
     }
 
+    @Test
+    public final void loadFromSaslYamlFileTest() throws IOException {
+        File yamlFile = getFile("kafkaSinkConfigSasl.yaml");
+        KafkaSinkConfig config = KafkaSinkConfig.load(yamlFile.getAbsolutePath());
+        assertNotNull(config);
+        assertEquals(config.getBootstrapServers(), "localhost:6667");
+        assertEquals(config.getTopic(), "test");
+        assertEquals(config.getAcks(), "1");
+        assertEquals(config.getBatchSize(), 16384L);
+        assertEquals(config.getMaxRequestSize(), 1048576L);
+        assertEquals(config.getSecurityProtocol(), SecurityProtocol.SASL_PLAINTEXT.name);
+        assertEquals(config.getSaslMechanism(), "PLAIN");
+        assertEquals(config.getSaslJaasConfig(), "org.apache.kafka.common.security.plain.PlainLoginModule required \nusername=\"alice\" \npassword=\"pwd\";");
+        assertEquals(config.getSslEndpointIdentificationAlgorithm(), "");
+        assertEquals(config.getSslTruststoreLocation(), "/etc/cert.pem");
+        assertEquals(config.getSslTruststorePassword(), "cert_pwd");
+    }
+
     private File getFile(String name) {
         ClassLoader classLoader = getClass().getClassLoader();
         return new File(classLoader.getResource(name).getFile());
diff --git a/pulsar-io/kafka/src/test/resources/kafkaSinkConfigSasl.yaml b/pulsar-io/kafka/src/test/resources/kafkaSinkConfigSasl.yaml
new file mode 100644
index 0000000..384d2fd
--- /dev/null
+++ b/pulsar-io/kafka/src/test/resources/kafkaSinkConfigSasl.yaml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"bootstrapServers": "localhost:6667"
+"topic": "test"
+"acks": "1"
+"securityProtocol": "SASL_PLAINTEXT"
+"saslMechanism" : "PLAIN"
+"saslJaasConfig" : "org.apache.kafka.common.security.plain.PlainLoginModule required \nusername=\"alice\" \npassword=\"pwd\";"
+"sslEnabledProtocols" : "TLSv1.2"
+"sslEndpointIdentificationAlgorithm" : ""
+"sslTruststoreLocation" : "/etc/cert.pem"
+"sslTruststorePassword" : "cert_pwd"
\ No newline at end of file