You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/12/06 18:59:25 UTC
spark git commit: [SPARK-26236][SS] Add kafka delegation token
support documentation.
Repository: spark
Updated Branches:
refs/heads/master ecaa495b1 -> b14a26ee5
[SPARK-26236][SS] Add kafka delegation token support documentation.
## What changes were proposed in this pull request?
Kafka delegation token support implemented in [PR#22598](https://github.com/apache/spark/pull/22598) but that didn't contain documentation because of rapid changes. Because it has been merged in this PR I've documented it.
## How was this patch tested?
jekyll build + manual html check
Closes #23195 from gaborgsomogyi/SPARK-26236.
Authored-by: Gabor Somogyi <ga...@gmail.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b14a26ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b14a26ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b14a26ee
Branch: refs/heads/master
Commit: b14a26ee5764aa98472bc69ab1dec408b89bc78a
Parents: ecaa495
Author: Gabor Somogyi <ga...@gmail.com>
Authored: Thu Dec 6 10:59:20 2018 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Dec 6 10:59:20 2018 -0800
----------------------------------------------------------------------
docs/structured-streaming-kafka-integration.md | 216 +++++++++++++++++++-
1 file changed, 206 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b14a26ee/docs/structured-streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index a549ce2..7040f8d 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -66,8 +66,8 @@ Dataset<Row> df = spark
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
- .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .load();
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to multiple topics
Dataset<Row> df = spark
@@ -75,8 +75,8 @@ Dataset<Row> df = spark
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
- .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .load();
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to a pattern
Dataset<Row> df = spark
@@ -84,8 +84,8 @@ Dataset<Row> df = spark
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
- .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .load();
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
{% endhighlight %}
</div>
@@ -479,7 +479,7 @@ StreamingQuery ds = df
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
- .start()
+ .start();
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
@@ -487,7 +487,7 @@ StreamingQuery ds = df
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .start()
+ .start();
{% endhighlight %}
</div>
@@ -547,14 +547,14 @@ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
- .save()
+ .save();
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .save()
+ .save();
{% endhighlight %}
</div>
@@ -624,3 +624,199 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark-
See [Application Submission Guide](submitting-applications.html) for more details about submitting
applications with external dependencies.
+
+## Security
+
+Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed
+description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security).
+
+It's worth noting that security is optional and turned off by default.
+
+Spark supports the following ways to authenticate against Kafka cluster:
+- **Delegation token (introduced in Kafka broker 1.1.0)**
+- **JAAS login configuration**
+
+### Delegation token
+
+This way the application can be configured via Spark parameters and may not need JAAS login
+configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information
+about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token).
+
+The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`,
+Spark considers the following log in options, in order of preference:
+- **JAAS login configuration**
+- **Keytab file**, such as,
+
+ ./bin/spark-submit \
+ --keytab <KEYTAB_FILE> \
+ --principal <PRINCIPAL> \
+ --conf spark.kafka.bootstrap.servers=<KAFKA_SERVERS> \
+ ...
+
+- **Kerberos credential cache**, such as,
+
+ ./bin/spark-submit \
+ --conf spark.kafka.bootstrap.servers=<KAFKA_SERVERS> \
+ ...
+
+The Kafka delegation token provider can be turned off by setting `spark.security.credentials.kafka.enabled` to `false` (default: `true`).
+
+Spark can be configured to use the following authentication protocols to obtain token (it must match with
+Kafka broker configuration):
+- **SASL SSL (default)**
+- **SSL**
+- **SASL PLAINTEXT (for testing)**
+
+After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
+Delegation token uses `SCRAM` login module for authentication and because of that the appropriate
+`sasl.mechanism` has to be configured on source/sink (it must match with Kafka broker configuration):
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+// Setting on Kafka Source for Streaming Queries
+val df = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+ .option("subscribe", "topic1")
+ .load()
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+// Setting on Kafka Source for Batch Queries
+val df = spark
+ .read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+ .option("subscribe", "topic1")
+ .load()
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+// Setting on Kafka Sink for Streaming Queries
+val ds = df
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .writeStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+ .option("topic", "topic1")
+ .start()
+
+// Setting on Kafka Sink for Batch Queries
+val ds = df
+ .selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)")
+ .write
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+ .save()
+
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+// Setting on Kafka Source for Streaming Queries
+Dataset<Row> df = spark
+ .readStream()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+ .option("subscribe", "topic1")
+ .load();
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
+
+// Setting on Kafka Source for Batch Queries
+Dataset<Row> df = spark
+ .read()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+ .option("subscribe", "topic1")
+ .load();
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
+
+// Setting on Kafka Sink for Streaming Queries
+StreamingQuery ds = df
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .writeStream()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+ .option("topic", "topic1")
+ .start();
+
+// Setting on Kafka Sink for Batch Queries
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .write()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
+ .option("topic", "topic1")
+ .save();
+
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+// Setting on Kafka Source for Streaming Queries
+df = spark \
+ .readStream \
+ .format("kafka") \
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
+ .option("subscribe", "topic1") \
+ .load()
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+
+// Setting on Kafka Source for Batch Queries
+df = spark \
+ .read \
+ .format("kafka") \
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
+ .option("subscribe", "topic1") \
+ .load()
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+
+// Setting on Kafka Sink for Streaming Queries
+ds = df \
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+ .writeStream \
+ .format("kafka") \
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
+ .option("topic", "topic1") \
+ .start()
+
+// Setting on Kafka Sink for Batch Queries
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+ .write \
+ .format("kafka") \
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
+ .option("topic", "topic1") \
+ .save()
+
+{% endhighlight %}
+</div>
+</div>
+
+When delegation token is available on an executor it can be overridden with JAAS login configuration.
+
+### JAAS login configuration
+
+JAAS login configuration must placed on all nodes where Spark tries to access Kafka cluster.
+This provides the possibility to apply any custom authentication logic with a higher cost to maintain.
+This can be done several ways. One possibility is to provide additional JVM parameters, such as,
+
+ ./bin/spark-submit \
+ --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
+ --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
+ ...
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org