You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by re...@apache.org on 2022/07/06 02:33:02 UTC

[flink] branch release-1.14 updated: [FLINK-27865][docs] Add example for configuring SASL and SSL in Kafka DataStream and SQL connector (#19904)

This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new d7162394ebd [FLINK-27865][docs] Add example for configuring SASL and SSL in Kafka DataStream and SQL connector (#19904)
d7162394ebd is described below

commit d7162394ebd95bc70c987c6d9d23a7a50288f104
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Thu Jun 23 10:58:50 2022 +0800

    [FLINK-27865][docs] Add example for configuring SASL and SSL in Kafka DataStream and SQL connector (#19904)
    
    (cherry picked from commit 5d564b1fe9dfaf9a07cc9af2726f71ed31d582ed)
---
 docs/content.zh/docs/connectors/table/kafka.md   | 49 ++++++++++++++++++++++
 docs/content/docs/connectors/datastream/kafka.md | 44 ++++++++++++++++----
 docs/content/docs/connectors/table/kafka.md      | 52 ++++++++++++++++++++++++
 3 files changed, 137 insertions(+), 8 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/kafka.md b/docs/content.zh/docs/connectors/table/kafka.md
index b6aefd031b2..e0dd74b2fac 100644
--- a/docs/content.zh/docs/connectors/table/kafka.md
+++ b/docs/content.zh/docs/connectors/table/kafka.md
@@ -530,6 +530,55 @@ Source 输出的 watermark 由读取的分区中最小的 watermark 决定。
 
 请参阅 [Kafka watermark 策略]({{< ref "docs/dev/datastream/event-time/generating_watermarks" >}}#watermark-策略和-kafka-连接器) 以获取更多细节。
 
+### 安全
+要启用加密和认证相关的安全配置,只需将安全配置加上 "properties." 前缀配置在 Kafka 表上即可。下面的代码片段展示了如何配置 Kafka 表以使用
+PLAIN 作为 SASL 机制并提供 JAAS 配置:
+```sql
+CREATE TABLE KafkaTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `behavior` STRING,
+  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
+) WITH (
+  'connector' = 'kafka',
+  ...
+  'properties.security.protocol' = 'SASL_PLAINTEXT',
+  'properties.sasl.mechanism' = 'PLAIN',
+  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
+)
+```
+另一个更复杂的例子,使用 SASL_SSL 作为安全协议并使用 SCRAM-SHA-256 作为 SASL 机制:
+```sql
+CREATE TABLE KafkaTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `behavior` STRING,
+  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
+) WITH (
+  'connector' = 'kafka',
+  ...
+  'properties.security.protocol' = 'SASL_SSL',
+  /* SSL 配置 */
+  /* 配置服务端提供的 truststore (CA 证书) 的路径 */
+  'properties.ssl.truststore.location' = '/path/to/kafka.client.truststore.jks',
+  'properties.ssl.truststore.password' = 'test1234',
+  /* 如果要求客户端认证,则需要配置 keystore (私钥) 的路径 */
+  'properties.ssl.keystore.location' = '/path/to/kafka.client.keystore.jks',
+  'properties.ssl.keystore.password' = 'test1234',
+  /* SASL 配置 */
+  /* 将 SASL 机制配置为 as SCRAM-SHA-256 */
+  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
+  /* 配置 JAAS */
+  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";'
+)
+```
+
+如果在作业 JAR 中 Kafka 客户端依赖的类路径被重置了(relocate class),登录模块(login module)的类路径可能会不同,因此请根据登录模块在
+JAR 中实际的类路径来改写以上配置。例如在 SQL client JAR 中,Kafka client 依赖被重置在了 `org.apache.flink.kafka.shaded.org.apache.kafka` 路径下,
+因此 plain 登录模块的类路径应写为 `org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule`。
+
+关于安全配置的详细描述,请参阅 <a href="https://kafka.apache.org/documentation/#security">Apache Kafka 文档中的"安全"一节</a>。
+
 数据类型映射
 ----------------
 
diff --git a/docs/content/docs/connectors/datastream/kafka.md b/docs/content/docs/connectors/datastream/kafka.md
index 263d0cacef6..84ae843d3dd 100644
--- a/docs/content/docs/connectors/datastream/kafka.md
+++ b/docs/content/docs/connectors/datastream/kafka.md
@@ -165,14 +165,6 @@ it is configured:
 - ```partition.discovery.interval.ms``` is overridden to -1 when
   ```setBounded(OffsetsInitializer)``` has been invoked
 
-The code snippet below shows configuring KafkaConsumer to use "PLAIN" as SASL mechanism and provide
-JAAS configuration:
-```java
-KafkaSource.builder()
-    .setProperty("sasl.mechanism", "PLAIN")
-    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
-```
-
 ### Dynamic Partition Discovery
 In order to handle scenarios like topic scaling-out or topic creation without restarting the Flink
 job, Kafka source can be configured to periodically discover new partitions under provided 
@@ -314,6 +306,42 @@ available metrics are correctly forwarded to the metrics system. You must ensure
 ```client.id.prefix``` for every ```KafkaSource``` is configured and that no other
 ```KafkaConsumer``` in your job uses the same ```client.id```.
 
+### Security
+In order to enable security configurations including encryption and authentication, you just need to setup security
+configurations as additional properties to the Kafka source. The code snippet below shows configuring Kafka source to
+use PLAIN as SASL mechanism and provide JAAS configuration:
+
+```java
+KafkaSource.builder()
+    .setProperty("security.protocol", "SASL_PLAINTEXT")
+    .setProperty("sasl.mechanism", "PLAIN")
+    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
+```
+
+For a more complex example, use SASL_SSL as the security protocol and use SCRAM-SHA-256 as SASL mechanism:
+```java
+KafkaSource.builder()
+    .setProperty("security.protocol", "SASL_SSL")
+    // SSL configurations
+    // Configure the path of truststore (CA) provided by the server
+    .setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks")
+    .setProperty("ssl.truststore.password", "test1234")
+    // Configure the path of keystore (private key) if client authentication is required
+    .setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks")
+    .setProperty("ssl.keystore.password", "test1234")
+    // SASL configurations
+    // Set SASL mechanism as SCRAM-SHA-256
+    .setProperty("sasl.mechanism", "SCRAM-SHA-256")
+    // Set JAAS configurations
+    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");
+```
+
+Please note that the class path of the login module in `sasl.jaas.config` might be different if you relocate Kafka
+client dependencies in the job JAR, so you may need to rewrite it with the actual class path of the module in the JAR.
+
+For detailed explanations of security configurations, please refer to
+<a href="https://kafka.apache.org/documentation/#security">the "Security" section in Apache Kafka documentation</a>.
+
 ### Behind the Scene
 {{< hint info >}}
 If you are interested in how Kafka source works under the design of new data source API, you may
diff --git a/docs/content/docs/connectors/table/kafka.md b/docs/content/docs/connectors/table/kafka.md
index bb65d528b60..9453434486c 100644
--- a/docs/content/docs/connectors/table/kafka.md
+++ b/docs/content/docs/connectors/table/kafka.md
@@ -562,6 +562,58 @@ option in the table configuration.
 Please refer to [Kafka watermark strategies]({{< ref "docs/dev/datastream/event-time/generating_watermarks" >}}#watermark-strategies-and-the-kafka-connector)
 for more details.
 
+### Security
+In order to enable security configurations including encryption and authentication, you just need to setup security
+configurations with "properties." prefix in table options. The code snippet below shows configuring Kafka table to
+use PLAIN as SASL mechanism and provide JAAS configuration:
+```sql
+CREATE TABLE KafkaTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `behavior` STRING,
+  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
+) WITH (
+  'connector' = 'kafka',
+  ...
+  'properties.security.protocol' = 'SASL_PLAINTEXT',
+  'properties.sasl.mechanism' = 'PLAIN',
+  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
+)
+```
+For a more complex example, use SASL_SSL as the security protocol and use SCRAM-SHA-256 as SASL mechanism:
+```sql
+CREATE TABLE KafkaTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `behavior` STRING,
+  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
+) WITH (
+  'connector' = 'kafka',
+  ...
+  'properties.security.protocol' = 'SASL_SSL',
+  /* SSL configurations */
+  /* Configure the path of truststore (CA) provided by the server */
+  'properties.ssl.truststore.location' = '/path/to/kafka.client.truststore.jks',
+  'properties.ssl.truststore.password' = 'test1234',
+  /* Configure the path of keystore (private key) if client authentication is required */
+  'properties.ssl.keystore.location' = '/path/to/kafka.client.keystore.jks',
+  'properties.ssl.keystore.password' = 'test1234',
+  /* SASL configurations */
+  /* Set SASL mechanism as SCRAM-SHA-256 */
+  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
+  /* Set JAAS configurations */
+  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";'
+)
+```
+
+Please note that the class path of the login module in `sasl.jaas.config` might be different if you relocate Kafka
+client dependencies, so you may need to rewrite it with the actual class path of the module in the JAR.
+For example if you are using SQL client JAR, which has relocate Kafka client dependencies to `org.apache.flink.kafka.shaded.org.apache.kafka`,
+the path of plain login module should be `org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule` instead.
+
+For detailed explanations of security configurations, please refer to
+<a href="https://kafka.apache.org/documentation/#security">the "Security" section in Apache Kafka documentation</a>.
+
 Data Type Mapping
 ----------------