You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/13 00:49:33 UTC

[GitHub] asfgit closed pull request #23274: [SPARK-26322][SS] Add spark.kafka.sasl.token.mechanism to ease delegation token configuration.

asfgit closed pull request #23274: [SPARK-26322][SS] Add spark.kafka.sasl.token.mechanism to ease delegation token configuration.
URL: https://github.com/apache/spark/pull/23274
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
index 85d74c27142ad..88c612c2d951a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
@@ -79,4 +79,13 @@ private[spark] object Kafka {
         "For further details please see kafka documentation. Only used to obtain delegation token.")
       .stringConf
       .createOptional
+
+  val TOKEN_SASL_MECHANISM =
+    ConfigBuilder("spark.kafka.sasl.token.mechanism")
+      .doc("SASL mechanism used for client connections with delegation token. Because SCRAM " +
+        "login module used for authentication a compatible mechanism has to be set here. " +
+        "For further details please see kafka documentation (sasl.mechanism). Only used to " +
+        "authenticate against Kafka broker with delegation token.")
+      .stringConf
+      .createWithDefault("SCRAM-SHA-512")
 }
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 7040f8da2c614..3d64ec4cb55f7 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -642,9 +642,9 @@ This way the application can be configured via Spark parameters and may not need
 configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information
 about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token).
 
-The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`,
+The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` is set,
 Spark considers the following log in options, in order of preference:
-- **JAAS login configuration**
+- **JAAS login configuration**, please see example below.
 - **Keytab file**, such as,
 
       ./bin/spark-submit \
@@ -669,144 +669,8 @@ Kafka broker configuration):
 
 After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
 Delegation token uses `SCRAM` login module for authentication and because of that the appropriate
-`sasl.mechanism` has to be configured on source/sink (it must match with Kafka broker configuration):
-
-<div class="codetabs">
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-// Setting on Kafka Source for Streaming Queries
-val df = spark
-  .readStream
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("subscribe", "topic1")
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .as[(String, String)]
-
-// Setting on Kafka Source for Batch Queries
-val df = spark
-  .read
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("subscribe", "topic1")
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .as[(String, String)]
-
-// Setting on Kafka Sink for Streaming Queries
-val ds = df
-  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .writeStream
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("topic", "topic1")
-  .start()
-
-// Setting on Kafka Sink for Batch Queries
-val ds = df
-  .selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)")
-  .write
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .save()
-
-{% endhighlight %}
-</div>
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-// Setting on Kafka Source for Streaming Queries
-Dataset<Row> df = spark
-  .readStream()
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("subscribe", "topic1")
-  .load();
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
-
-// Setting on Kafka Source for Batch Queries
-Dataset<Row> df = spark
-  .read()
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("subscribe", "topic1")
-  .load();
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
-
-// Setting on Kafka Sink for Streaming Queries
-StreamingQuery ds = df
-  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .writeStream()
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("topic", "topic1")
-  .start();
-
-// Setting on Kafka Sink for Batch Queries
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .write()
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("topic", "topic1")
-  .save();
-
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-
-// Setting on Kafka Source for Streaming Queries
-df = spark \
-  .readStream \
-  .format("kafka") \
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
-  .option("subscribe", "topic1") \
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-
-// Setting on Kafka Source for Batch Queries
-df = spark \
-  .read \
-  .format("kafka") \
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
-  .option("subscribe", "topic1") \
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-
-// Setting on Kafka Sink for Streaming Queries
-ds = df \
-  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
-  .writeStream \
-  .format("kafka") \
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
-  .option("topic", "topic1") \
-  .start()
-
-// Setting on Kafka Sink for Batch Queries
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
-  .write \
-  .format("kafka") \
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
-  .option("topic", "topic1") \
-  .save()
-
-{% endhighlight %}
-</div>
-</div>
+`spark.kafka.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter
+must match with Kafka broker configuration.
 
 When delegation token is available on an executor it can be overridden with JAAS login configuration.
 
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
index 74d5ef9c05f14..a9b76def4f8be 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.kafka010
 
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.security.token.{Token, TokenIdentifier}
 import org.apache.kafka.common.security.scram.ScramLoginModule
 
 import org.apache.spark.SparkConf
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 0ac330435e5c5..6a0c2088ac3d1 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
 import org.apache.spark.SparkEnv
 import org.apache.spark.deploy.security.KafkaTokenUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
@@ -501,7 +502,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
       // If buffer config is not set, set it to reasonable value to work around
       // buffer issues (see KAFKA-3135)
       .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
-      .setTokenJaasConfigIfNeeded()
+      .setAuthenticationConfigIfNeeded()
       .build()
 
   def kafkaParamsForExecutors(
@@ -523,7 +524,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
       // If buffer config is not set, set it to reasonable value to work around
       // buffer issues (see KAFKA-3135)
       .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
-      .setTokenJaasConfigIfNeeded()
+      .setAuthenticationConfigIfNeeded()
       .build()
 
   /**
@@ -556,7 +557,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
       this
     }
 
-    def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
+    def setAuthenticationConfigIfNeeded(): ConfigUpdater = {
       // There are multiple possibilities to log in and applied in the following order:
       // - JVM global security provided -> try to log in with JVM global security configuration
       //   which can be configured for example with 'java.security.auth.login.config'.
@@ -568,11 +569,11 @@ private[kafka010] object KafkaSourceProvider extends Logging {
       } else if (KafkaSecurityHelper.isTokenAvailable()) {
         logDebug("Delegation token detected, using it for login.")
         val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
-        val mechanism = kafkaParams
-          .getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM)
+        set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+        val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
         require(mechanism.startsWith("SCRAM"),
           "Delegation token works only with SCRAM mechanism.")
-        set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+        set(SaslConfigs.SASL_MECHANISM, mechanism)
       }
       this
     }
@@ -600,7 +601,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
     ConfigUpdater("executor", specifiedKafkaParams)
       .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
       .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
-      .setTokenJaasConfigIfNeeded()
+      .setAuthenticationConfigIfNeeded()
       .build()
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org