You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/12/06 18:59:25 UTC

spark git commit: [SPARK-26236][SS] Add kafka delegation token support documentation.

Repository: spark
Updated Branches:
  refs/heads/master ecaa495b1 -> b14a26ee5


[SPARK-26236][SS] Add kafka delegation token support documentation.

## What changes were proposed in this pull request?

Kafka delegation token support implemented in [PR#22598](https://github.com/apache/spark/pull/22598) but that didn't contain documentation because of rapid changes. Because it has been merged in this PR I've documented it.

## How was this patch tested?
jekyll build + manual html check

Closes #23195 from gaborgsomogyi/SPARK-26236.

Authored-by: Gabor Somogyi <ga...@gmail.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b14a26ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b14a26ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b14a26ee

Branch: refs/heads/master
Commit: b14a26ee5764aa98472bc69ab1dec408b89bc78a
Parents: ecaa495
Author: Gabor Somogyi <ga...@gmail.com>
Authored: Thu Dec 6 10:59:20 2018 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Dec 6 10:59:20 2018 -0800

----------------------------------------------------------------------
 docs/structured-streaming-kafka-integration.md | 216 +++++++++++++++++++-
 1 file changed, 206 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b14a26ee/docs/structured-streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index a549ce2..7040f8d 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -66,8 +66,8 @@ Dataset<Row> df = spark
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribe", "topic1")
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .load();
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
 
 // Subscribe to multiple topics
 Dataset<Row> df = spark
@@ -75,8 +75,8 @@ Dataset<Row> df = spark
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribe", "topic1,topic2")
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .load();
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
 
 // Subscribe to a pattern
 Dataset<Row> df = spark
@@ -84,8 +84,8 @@ Dataset<Row> df = spark
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribePattern", "topic.*")
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .load();
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
 
 {% endhighlight %}
 </div>
@@ -479,7 +479,7 @@ StreamingQuery ds = df
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("topic", "topic1")
-  .start()
+  .start();
 
 // Write key-value data from a DataFrame to Kafka using a topic specified in the data
 StreamingQuery ds = df
@@ -487,7 +487,7 @@ StreamingQuery ds = df
   .writeStream()
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .start()
+  .start();
 
 {% endhighlight %}
 </div>
@@ -547,14 +547,14 @@ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("topic", "topic1")
-  .save()
+  .save();
 
 // Write key-value data from a DataFrame to Kafka using a topic specified in the data
 df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
   .write()
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .save()
+  .save();
 
 {% endhighlight %}
 </div>
@@ -624,3 +624,199 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark-
 
 See [Application Submission Guide](submitting-applications.html) for more details about submitting
 applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed
+description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**
+- **JAAS login configuration**
+
+### Delegation token
+
+This way the application can be configured via Spark parameters and may not need JAAS login
+configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information
+about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`,
+Spark considers the following log in options, in order of preference:
+- **JAAS login configuration**
+- **Keytab file**, such as,
+
+      ./bin/spark-submit \
+          --keytab <KEYTAB_FILE> \
+          --principal <PRINCIPAL> \
+          --conf spark.kafka.bootstrap.servers=<KAFKA_SERVERS> \
+          ...
+
+- **Kerberos credential cache**, such as,
+
+      ./bin/spark-submit \
+          --conf spark.kafka.bootstrap.servers=<KAFKA_SERVERS> \
+          ...
+
+The Kafka delegation token provider can be turned off by setting `spark.security.credentials.kafka.enabled` to `false` (default: `true`).
+
+Spark can be configured to use the following authentication protocols to obtain token (it must match with
+Kafka broker configuration):
+- **SASL SSL (default)**
+- **SSL**
+- **SASL PLAINTEXT (for testing)**
+
+After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
+Delegation token uses `SCRAM` login module for authentication and because of that the appropriate
+`sasl.mechanism` has to be configured on source/sink (it must match with Kafka broker configuration):
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+// Setting on Kafka Source for Streaming Queries
+val df = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+  .option("subscribe", "topic1")
+  .load()
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+
+// Setting on Kafka Source for Batch Queries
+val df = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+  .option("subscribe", "topic1")
+  .load()
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+
+// Setting on Kafka Sink for Streaming Queries
+val ds = df
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+  .option("topic", "topic1")
+  .start()
+
+// Setting on Kafka Sink for Batch Queries
+val ds = df
+  .selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+  .save()
+
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+// Setting on Kafka Source for Streaming Queries
+Dataset<Row> df = spark
+  .readStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+  .option("subscribe", "topic1")
+  .load();
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
+
+// Setting on Kafka Source for Batch Queries
+Dataset<Row> df = spark
+  .read()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+  .option("subscribe", "topic1")
+  .load();
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
+
+// Setting on Kafka Sink for Streaming Queries
+StreamingQuery ds = df
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+  .option("topic", "topic1")
+  .start();
+
+// Setting on Kafka Sink for Batch Queries
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+  .option("topic", "topic1")
+  .save();
+
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+// Setting on Kafka Source for Streaming Queries
+df = spark \
+  .readStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
+  .option("subscribe", "topic1") \
+  .load()
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+
+// Setting on Kafka Source for Batch Queries
+df = spark \
+  .read \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
+  .option("subscribe", "topic1") \
+  .load()
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+
+// Setting on Kafka Sink for Streaming Queries
+ds = df \
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
+  .option("topic", "topic1") \
+  .start()
+
+// Setting on Kafka Sink for Batch Queries
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .write \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
+  .option("topic", "topic1") \
+  .save()
+
+{% endhighlight %}
+</div>
+</div>
+
+When delegation token is available on an executor it can be overridden with JAAS login configuration.
+
+### JAAS login configuration
+
+JAAS login configuration must placed on all nodes where Spark tries to access Kafka cluster.
+This provides the possibility to apply any custom authentication logic with a higher cost to maintain.
+This can be done several ways. One possibility is to provide additional JVM parameters, such as,
+
+    ./bin/spark-submit \
+        --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
+        --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
+        ...


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org