You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/12/13 00:46:05 UTC

[spark] branch master updated: [SPARK-26322][SS] Add spark.kafka.sasl.token.mechanism to ease delegation token configuration.

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6daa783  [SPARK-26322][SS] Add spark.kafka.sasl.token.mechanism to ease delegation token configuration.
6daa783 is described below

commit 6daa78309460e338dd688cf6cdbd46a12666f72e
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Wed Dec 12 16:45:50 2018 -0800

    [SPARK-26322][SS] Add spark.kafka.sasl.token.mechanism to ease delegation token configuration.
    
    ## What changes were proposed in this pull request?
    
    When Kafka delegation token obtained, SCRAM `sasl.mechanism` has to be configured for authentication. This can be configured on the related source/sink which is inconvenient from user perspective. Such granularity is not required and this configuration can be implemented with one central parameter.
    
    In this PR `spark.kafka.sasl.token.mechanism` added to configure this centrally (default: `SCRAM-SHA-512`).
    
    ## How was this patch tested?
    
    Existing unit tests + on cluster.
    
    Closes #23274 from gaborgsomogyi/SPARK-26322.
    
    Authored-by: Gabor Somogyi <ga...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../org/apache/spark/internal/config/Kafka.scala   |   9 ++
 docs/structured-streaming-kafka-integration.md     | 144 +--------------------
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |  15 ++-
 3 files changed, 21 insertions(+), 147 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
index 064fc93..e91ddd3 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
@@ -79,4 +79,13 @@ private[spark] object Kafka {
         "For further details please see kafka documentation. Only used to obtain delegation token.")
       .stringConf
       .createOptional
+
+  val TOKEN_SASL_MECHANISM =
+    ConfigBuilder("spark.kafka.sasl.token.mechanism")
+      .doc("SASL mechanism used for client connections with delegation token. Because SCRAM " +
+        "login module used for authentication a compatible mechanism has to be set here. " +
+        "For further details please see kafka documentation (sasl.mechanism). Only used to " +
+        "authenticate against Kafka broker with delegation token.")
+      .stringConf
+      .createWithDefault("SCRAM-SHA-512")
 }
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 7040f8d..3d64ec4 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -642,9 +642,9 @@ This way the application can be configured via Spark parameters and may not need
 configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information
 about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token).
 
-The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`,
+The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` is set,
 Spark considers the following log in options, in order of preference:
-- **JAAS login configuration**
+- **JAAS login configuration**, please see example below.
 - **Keytab file**, such as,
 
       ./bin/spark-submit \
@@ -669,144 +669,8 @@ Kafka broker configuration):
 
 After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
 Delegation token uses `SCRAM` login module for authentication and because of that the appropriate
-`sasl.mechanism` has to be configured on source/sink (it must match with Kafka broker configuration):
-
-<div class="codetabs">
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-// Setting on Kafka Source for Streaming Queries
-val df = spark
-  .readStream
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("subscribe", "topic1")
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .as[(String, String)]
-
-// Setting on Kafka Source for Batch Queries
-val df = spark
-  .read
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("subscribe", "topic1")
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .as[(String, String)]
-
-// Setting on Kafka Sink for Streaming Queries
-val ds = df
-  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .writeStream
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("topic", "topic1")
-  .start()
-
-// Setting on Kafka Sink for Batch Queries
-val ds = df
-  .selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)")
-  .write
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .save()
-
-{% endhighlight %}
-</div>
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-// Setting on Kafka Source for Streaming Queries
-Dataset<Row> df = spark
-  .readStream()
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("subscribe", "topic1")
-  .load();
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
-
-// Setting on Kafka Source for Batch Queries
-Dataset<Row> df = spark
-  .read()
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("subscribe", "topic1")
-  .load();
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
-
-// Setting on Kafka Sink for Streaming Queries
-StreamingQuery ds = df
-  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .writeStream()
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("topic", "topic1")
-  .start();
-
-// Setting on Kafka Sink for Batch Queries
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .write()
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("topic", "topic1")
-  .save();
-
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-
-// Setting on Kafka Source for Streaming Queries
-df = spark \
-  .readStream \
-  .format("kafka") \
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
-  .option("subscribe", "topic1") \
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-
-// Setting on Kafka Source for Batch Queries
-df = spark \
-  .read \
-  .format("kafka") \
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
-  .option("subscribe", "topic1") \
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-
-// Setting on Kafka Sink for Streaming Queries
-ds = df \
-  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
-  .writeStream \
-  .format("kafka") \
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
-  .option("topic", "topic1") \
-  .start()
-
-// Setting on Kafka Sink for Batch Queries
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
-  .write \
-  .format("kafka") \
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
-  .option("topic", "topic1") \
-  .save()
-
-{% endhighlight %}
-</div>
-</div>
+`spark.kafka.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter
+must match with Kafka broker configuration.
 
 When delegation token is available on an executor it can be overridden with JAAS login configuration.
 
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 0ac3304..6a0c208 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
 import org.apache.spark.SparkEnv
 import org.apache.spark.deploy.security.KafkaTokenUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
@@ -501,7 +502,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
       // If buffer config is not set, set it to reasonable value to work around
       // buffer issues (see KAFKA-3135)
       .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
-      .setTokenJaasConfigIfNeeded()
+      .setAuthenticationConfigIfNeeded()
       .build()
 
   def kafkaParamsForExecutors(
@@ -523,7 +524,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
       // If buffer config is not set, set it to reasonable value to work around
       // buffer issues (see KAFKA-3135)
       .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
-      .setTokenJaasConfigIfNeeded()
+      .setAuthenticationConfigIfNeeded()
       .build()
 
   /**
@@ -556,7 +557,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
       this
     }
 
-    def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
+    def setAuthenticationConfigIfNeeded(): ConfigUpdater = {
       // There are multiple possibilities to log in and applied in the following order:
       // - JVM global security provided -> try to log in with JVM global security configuration
       //   which can be configured for example with 'java.security.auth.login.config'.
@@ -568,11 +569,11 @@ private[kafka010] object KafkaSourceProvider extends Logging {
       } else if (KafkaSecurityHelper.isTokenAvailable()) {
         logDebug("Delegation token detected, using it for login.")
         val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
-        val mechanism = kafkaParams
-          .getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM)
+        set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+        val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
         require(mechanism.startsWith("SCRAM"),
           "Delegation token works only with SCRAM mechanism.")
-        set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+        set(SaslConfigs.SASL_MECHANISM, mechanism)
       }
       this
     }
@@ -600,7 +601,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
     ConfigUpdater("executor", specifiedKafkaParams)
       .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
       .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
-      .setTokenJaasConfigIfNeeded()
+      .setAuthenticationConfigIfNeeded()
       .build()
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org