You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/12/13 00:46:05 UTC
[spark] branch master updated: [SPARK-26322][SS] Add
spark.kafka.sasl.token.mechanism to ease delegation token configuration.
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6daa783 [SPARK-26322][SS] Add spark.kafka.sasl.token.mechanism to ease delegation token configuration.
6daa783 is described below
commit 6daa78309460e338dd688cf6cdbd46a12666f72e
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Wed Dec 12 16:45:50 2018 -0800
[SPARK-26322][SS] Add spark.kafka.sasl.token.mechanism to ease delegation token configuration.
## What changes were proposed in this pull request?
When Kafka delegation token obtained, SCRAM `sasl.mechanism` has to be configured for authentication. This can be configured on the related source/sink which is inconvenient from user perspective. Such granularity is not required and this configuration can be implemented with one central parameter.
In this PR `spark.kafka.sasl.token.mechanism` added to configure this centrally (default: `SCRAM-SHA-512`).
## How was this patch tested?
Existing unit tests + on cluster.
Closes #23274 from gaborgsomogyi/SPARK-26322.
Authored-by: Gabor Somogyi <ga...@gmail.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
.../org/apache/spark/internal/config/Kafka.scala | 9 ++
docs/structured-streaming-kafka-integration.md | 144 +--------------------
.../spark/sql/kafka010/KafkaSourceProvider.scala | 15 ++-
3 files changed, 21 insertions(+), 147 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
index 064fc93..e91ddd3 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
@@ -79,4 +79,13 @@ private[spark] object Kafka {
"For further details please see kafka documentation. Only used to obtain delegation token.")
.stringConf
.createOptional
+
+ val TOKEN_SASL_MECHANISM =
+ ConfigBuilder("spark.kafka.sasl.token.mechanism")
+ .doc("SASL mechanism used for client connections with delegation token. Because SCRAM " +
+ "login module used for authentication a compatible mechanism has to be set here. " +
+ "For further details please see kafka documentation (sasl.mechanism). Only used to " +
+ "authenticate against Kafka broker with delegation token.")
+ .stringConf
+ .createWithDefault("SCRAM-SHA-512")
}
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 7040f8d..3d64ec4 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -642,9 +642,9 @@ This way the application can be configured via Spark parameters and may not need
configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information
about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token).
-The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`,
+The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` is set,
Spark considers the following log in options, in order of preference:
-- **JAAS login configuration**
+- **JAAS login configuration**, please see example below.
- **Keytab file**, such as,
./bin/spark-submit \
@@ -669,144 +669,8 @@ Kafka broker configuration):
After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
Delegation token uses `SCRAM` login module for authentication and because of that the appropriate
-`sasl.mechanism` has to be configured on source/sink (it must match with Kafka broker configuration):
-
-<div class="codetabs">
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-// Setting on Kafka Source for Streaming Queries
-val df = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .option("subscribe", "topic1")
- .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
-
-// Setting on Kafka Source for Batch Queries
-val df = spark
- .read
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .option("subscribe", "topic1")
- .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
-
-// Setting on Kafka Sink for Streaming Queries
-val ds = df
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .writeStream
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .option("topic", "topic1")
- .start()
-
-// Setting on Kafka Sink for Batch Queries
-val ds = df
- .selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)")
- .write
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .save()
-
-{% endhighlight %}
-</div>
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-// Setting on Kafka Source for Streaming Queries
-Dataset<Row> df = spark
- .readStream()
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .option("subscribe", "topic1")
- .load();
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
-
-// Setting on Kafka Source for Batch Queries
-Dataset<Row> df = spark
- .read()
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .option("subscribe", "topic1")
- .load();
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
-
-// Setting on Kafka Sink for Streaming Queries
-StreamingQuery ds = df
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .writeStream()
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .option("topic", "topic1")
- .start();
-
-// Setting on Kafka Sink for Batch Queries
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .write()
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .option("topic", "topic1")
- .save();
-
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-
-// Setting on Kafka Source for Streaming Queries
-df = spark \
- .readStream \
- .format("kafka") \
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
- .option("subscribe", "topic1") \
- .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-
-// Setting on Kafka Source for Batch Queries
-df = spark \
- .read \
- .format("kafka") \
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
- .option("subscribe", "topic1") \
- .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-
-// Setting on Kafka Sink for Streaming Queries
-ds = df \
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
- .writeStream \
- .format("kafka") \
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
- .option("topic", "topic1") \
- .start()
-
-// Setting on Kafka Sink for Batch Queries
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
- .write \
- .format("kafka") \
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
- .option("topic", "topic1") \
- .save()
-
-{% endhighlight %}
-</div>
-</div>
+`spark.kafka.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter
+must match with Kafka broker configuration.
When delegation token is available on an executor it can be overridden with JAAS login configuration.
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 0ac3304..6a0c208 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.spark.SparkEnv
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
@@ -501,7 +502,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
// If buffer config is not set, set it to reasonable value to work around
// buffer issues (see KAFKA-3135)
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
- .setTokenJaasConfigIfNeeded()
+ .setAuthenticationConfigIfNeeded()
.build()
def kafkaParamsForExecutors(
@@ -523,7 +524,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
// If buffer config is not set, set it to reasonable value to work around
// buffer issues (see KAFKA-3135)
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
- .setTokenJaasConfigIfNeeded()
+ .setAuthenticationConfigIfNeeded()
.build()
/**
@@ -556,7 +557,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
this
}
- def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
+ def setAuthenticationConfigIfNeeded(): ConfigUpdater = {
// There are multiple possibilities to log in and applied in the following order:
// - JVM global security provided -> try to log in with JVM global security configuration
// which can be configured for example with 'java.security.auth.login.config'.
@@ -568,11 +569,11 @@ private[kafka010] object KafkaSourceProvider extends Logging {
} else if (KafkaSecurityHelper.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
- val mechanism = kafkaParams
- .getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM)
+ set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+ val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),
"Delegation token works only with SCRAM mechanism.")
- set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+ set(SaslConfigs.SASL_MECHANISM, mechanism)
}
this
}
@@ -600,7 +601,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
ConfigUpdater("executor", specifiedKafkaParams)
.set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
.set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
- .setTokenJaasConfigIfNeeded()
+ .setAuthenticationConfigIfNeeded()
.build()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org