You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/03/07 19:37:14 UTC

[spark] branch master updated: [SPARK-27022][DSTREAMS] Add kafka delegation token support.

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 98a8725  [SPARK-27022][DSTREAMS] Add kafka delegation token support.
98a8725 is described below

commit 98a8725e66ee992e8db7035449e73225a795b530
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Thu Mar 7 11:36:37 2019 -0800

    [SPARK-27022][DSTREAMS] Add kafka delegation token support.
    
    ## What changes were proposed in this pull request?
    
    It adds Kafka delegation token support for DStreams. Please be aware as Kafka native sink is not available for DStreams this PR contains delegation token usage only on consumer side.
    
    What this PR contains:
    * Usage of token through dynamic JAAS configuration
    * `KafkaConfigUpdater` moved to `kafka-0-10-token-provider`
    * `KafkaSecurityHelper` functionality moved into `KafkaTokenUtil`
    * Documentation
    
    ## How was this patch tested?
    
    Existing unit tests + on cluster.
    
    Long running Kafka to file tests on 4 node cluster with randomly thrown artificial exceptions.
    
    Test scenario:
    
    * 4 node cluster
    * Yarn
    * Kafka broker version 2.1.0
    * security.protocol = SASL_SSL
    * sasl.mechanism = SCRAM-SHA-512
    
    Kafka broker settings:
    
    * delegation.token.expiry.time.ms=600000 (10 min)
    * delegation.token.max.lifetime.ms=1200000 (20 min)
    * delegation.token.expiry.check.interval.ms=300000 (5 min)
    
    After each 7.5 minutes new delegation token obtained from Kafka broker (10 min * 0.75).
    When token expired after 10 minutes (Spark obtains new one and doesn't renew the old), the brokers expiring thread comes after each 5 minutes (invalidates expired tokens) and artificial exception has been thrown inside the Spark application (such case Spark closes connection), then the latest delegation token picked up correctly.
    
    cd docs/
    SKIP_API=1 jekyll build
    Manual webpage check.
    
    Closes #23929 from gaborgsomogyi/SPARK-27022.
    
    Authored-by: Gabor Somogyi <ga...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 docs/streaming-kafka-0-10-integration.md           |  7 +++
 .../spark/sql/kafka010/CachedKafkaProducer.scala   |  1 +
 .../spark/sql/kafka010/ConsumerStrategy.scala      |  2 +
 .../spark/sql/kafka010/KafkaDataConsumer.scala     |  1 +
 .../spark/sql/kafka010/KafkaSecurityHelper.scala   | 53 -------------------
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |  1 +
 .../sql/kafka010/KafkaSecurityHelperSuite.scala    | 43 ----------------
 external/kafka-0-10-token-provider/pom.xml         |  5 ++
 .../spark}/kafka010/KafkaConfigUpdater.scala       |  9 ++--
 .../org/apache/spark/kafka010/KafkaTokenUtil.scala | 30 ++++++++++-
 .../spark}/kafka010/KafkaConfigUpdaterSuite.scala  |  2 +-
 .../spark}/kafka010/KafkaDelegationTokenTest.scala |  3 +-
 .../spark/kafka010/KafkaTokenUtilSuite.scala       | 59 ++++++++--------------
 external/kafka-0-10/pom.xml                        |  5 ++
 .../streaming/kafka010/ConsumerStrategy.scala      | 19 +++++--
 .../streaming/kafka010/KafkaDataConsumer.scala     |  8 ++-
 16 files changed, 101 insertions(+), 147 deletions(-)

diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md
index 975adacca..3fb6271 100644
--- a/docs/streaming-kafka-0-10-integration.md
+++ b/docs/streaming-kafka-0-10-integration.md
@@ -315,3 +315,10 @@ As with any Spark applications, `spark-submit` is used to launch your applicatio
 
 For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-pro [...]
 
+### Security
+
+See [Structured Streaming Security](structured-streaming-kafka-integration.html#security).
+
+##### Additional Caveats
+
+- Kafka native sink is not available so delegation token used only on consumer side.
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
index f24001f..062ce9a 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
@@ -28,6 +28,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
+import org.apache.spark.kafka010.KafkaConfigUpdater
 
 private[kafka010] object CachedKafkaProducer extends Logging {
 
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
index dfdafce..2326619 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
@@ -25,6 +25,8 @@ import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer}
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.TopicPartition
 
+import org.apache.spark.kafka010.KafkaConfigUpdater
+
 /**
  * Subscribe allows you to subscribe to a fixed collection of topics.
  * SubscribePattern allows you to use a regex to specify topics of interest.
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
index a0255a1..83bf4b1 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.internal.Logging
+import org.apache.spark.kafka010.KafkaConfigUpdater
 import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.util.UninterruptibleThread
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
deleted file mode 100644
index a11d54f..0000000
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.kafka.common.security.scram.ScramLoginModule
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-import org.apache.spark.kafka010.KafkaTokenUtil
-
-private[kafka010] object KafkaSecurityHelper extends Logging {
-  def isTokenAvailable(): Boolean = {
-    UserGroupInformation.getCurrentUser().getCredentials.getToken(
-      KafkaTokenUtil.TOKEN_SERVICE) != null
-  }
-
-  def getTokenJaasParams(sparkConf: SparkConf): String = {
-    val token = UserGroupInformation.getCurrentUser().getCredentials.getToken(
-      KafkaTokenUtil.TOKEN_SERVICE)
-    val username = new String(token.getIdentifier)
-    val password = new String(token.getPassword)
-
-    val loginModuleName = classOf[ScramLoginModule].getName
-    val params =
-      s"""
-      |$loginModuleName required
-      | tokenauth=true
-      | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
-      | username="$username"
-      | password="$password";
-      """.stripMargin.replace("\n", "")
-    logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}")
-
-    params
-  }
-}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 4dc6955..b39e0d4 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.kafka010.KafkaConfigUpdater
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
deleted file mode 100644
index d908bbf..0000000
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-
-class KafkaSecurityHelperSuite extends SparkFunSuite with KafkaDelegationTokenTest {
-  test("isTokenAvailable without token should return false") {
-    assert(!KafkaSecurityHelper.isTokenAvailable())
-  }
-
-  test("isTokenAvailable with token should return true") {
-    addTokenToUGI()
-
-    assert(KafkaSecurityHelper.isTokenAvailable())
-  }
-
-  test("getTokenJaasParams with token should return scram module") {
-    addTokenToUGI()
-
-    val jaasParams = KafkaSecurityHelper.getTokenJaasParams(new SparkConf())
-
-    assert(jaasParams.contains("ScramLoginModule required"))
-    assert(jaasParams.contains("tokenauth=true"))
-    assert(jaasParams.contains(tokenId))
-    assert(jaasParams.contains(tokenPassword))
-  }
-}
diff --git a/external/kafka-0-10-token-provider/pom.xml b/external/kafka-0-10-token-provider/pom.xml
index b2abcd9..40ef1f7 100644
--- a/external/kafka-0-10-token-provider/pom.xml
+++ b/external/kafka-0-10-token-provider/pom.xml
@@ -53,6 +53,11 @@
       <version>${kafka.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
     </dependency>
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala
similarity index 89%
rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
rename to external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala
index 978dfe6..d24eb4a 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
+++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.kafka010
+package org.apache.spark.kafka010
 
 import java.{util => ju}
 
@@ -26,12 +26,11 @@ import org.apache.kafka.common.config.SaslConfigs
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Kafka
-import org.apache.spark.kafka010.KafkaTokenUtil
 
 /**
  * Class to conveniently update Kafka config params, while logging the changes
  */
-private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object])
+private[spark] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object])
     extends Logging {
   private val map = new ju.HashMap[String, Object](kafkaParams.asJava)
 
@@ -58,9 +57,9 @@ private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map
     //   configuration.
     if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
       logDebug("JVM global security configuration detected, using it for login.")
-    } else if (KafkaSecurityHelper.isTokenAvailable()) {
+    } else if (KafkaTokenUtil.isTokenAvailable()) {
       logDebug("Delegation token detected, using it for login.")
-      val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
+      val jaasParams = KafkaTokenUtil.getTokenJaasParams(SparkEnv.get.conf)
       set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
       val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
       require(mechanism.startsWith("SCRAM"),
diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
index 574d58b..e5604f2 100644
--- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
+++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
+import org.apache.kafka.common.security.scram.ScramLoginModule
 import org.apache.kafka.common.security.token.delegation.DelegationToken
 
 import org.apache.spark.SparkConf
@@ -154,7 +155,7 @@ private[spark] object KafkaTokenUtil extends Logging {
     }
   }
 
-  private[kafka010] def getKeytabJaasParams(sparkConf: SparkConf): String = {
+  private def getKeytabJaasParams(sparkConf: SparkConf): String = {
     val params =
       s"""
       |${getKrb5LoginModuleName} required
@@ -167,7 +168,7 @@ private[spark] object KafkaTokenUtil extends Logging {
     params
   }
 
-  def getTicketCacheJaasParams(sparkConf: SparkConf): String = {
+  private def getTicketCacheJaasParams(sparkConf: SparkConf): String = {
     val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
     require(serviceName.nonEmpty, "Kerberos service name must be defined")
 
@@ -208,4 +209,29 @@ private[spark] object KafkaTokenUtil extends Logging {
         dateFormat.format(tokenInfo.maxTimestamp)))
     }
   }
+
+  def isTokenAvailable(): Boolean = {
+    UserGroupInformation.getCurrentUser().getCredentials.getToken(
+      KafkaTokenUtil.TOKEN_SERVICE) != null
+  }
+
+  def getTokenJaasParams(sparkConf: SparkConf): String = {
+    val token = UserGroupInformation.getCurrentUser().getCredentials.getToken(
+      KafkaTokenUtil.TOKEN_SERVICE)
+    val username = new String(token.getIdentifier)
+    val password = new String(token.getPassword)
+
+    val loginModuleName = classOf[ScramLoginModule].getName
+    val params =
+      s"""
+      |$loginModuleName required
+      | tokenauth=true
+      | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
+      | username="$username"
+      | password="$password";
+      """.stripMargin.replace("\n", "")
+    logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}")
+
+    params
+  }
 }
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala
similarity index 98%
rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala
rename to external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala
index 25ccca3..538486b 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala
+++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.kafka010
+package org.apache.spark.kafka010
 
 import org.apache.kafka.common.config.SaslConfigs
 
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala
similarity index 97%
rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala
rename to external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala
index d0cefc4..bd9b873 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala
+++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.kafka010
+package org.apache.spark.kafka010
 
 import java.{util => ju}
 import javax.security.auth.login.{AppConfigurationEntry, Configuration}
@@ -26,7 +26,6 @@ import org.mockito.Mockito.mock
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
-import org.apache.spark.kafka010.KafkaTokenUtil
 import org.apache.spark.kafka010.KafkaTokenUtil.KafkaDelegationTokenIdentifier
 
 /**
diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
index 5da6260..0a5af1d 100644
--- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
+++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
@@ -17,20 +17,17 @@
 
 package org.apache.spark.kafka010
 
-import java.{util => ju}
 import java.security.PrivilegedExceptionAction
-import javax.security.auth.login.{AppConfigurationEntry, Configuration}
 
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
-import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config._
 
-class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
   private val bootStrapServers = "127.0.0.1:0"
   private val trustStoreLocation = "/path/to/trustStore"
   private val trustStorePassword = "trustStoreSecret"
@@ -42,44 +39,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
 
   private var sparkConf: SparkConf = null
 
-  private class KafkaJaasConfiguration extends Configuration {
-    val entry =
-      new AppConfigurationEntry(
-        "DummyModule",
-        AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
-        ju.Collections.emptyMap[String, Object]()
-      )
-
-    override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
-      if (name.equals("KafkaClient")) {
-        Array(entry)
-      } else {
-        null
-      }
-    }
-  }
-
   override def beforeEach(): Unit = {
     super.beforeEach()
     sparkConf = new SparkConf()
   }
 
-  override def afterEach(): Unit = {
-    try {
-      resetGlobalConfig()
-    } finally {
-      super.afterEach()
-    }
-  }
-
-  private def setGlobalKafkaClientConfig(): Unit = {
-    Configuration.setConfiguration(new KafkaJaasConfiguration)
-  }
-
-  private def resetGlobalConfig(): Unit = {
-    Configuration.setConfiguration(null)
-  }
-
   test("checkProxyUser with proxy current user should throw exception") {
     val realUser = UserGroupInformation.createUserForTesting("realUser", Array())
     UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, Array()).doAs(
@@ -229,4 +193,25 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
 
     assert(KafkaTokenUtil.isGlobalJaasConfigurationProvided)
   }
+
+  test("isTokenAvailable without token should return false") {
+    assert(!KafkaTokenUtil.isTokenAvailable())
+  }
+
+  test("isTokenAvailable with token should return true") {
+    addTokenToUGI()
+
+    assert(KafkaTokenUtil.isTokenAvailable())
+  }
+
+  test("getTokenJaasParams with token should return scram module") {
+    addTokenToUGI()
+
+    val jaasParams = KafkaTokenUtil.getTokenJaasParams(new SparkConf())
+
+    assert(jaasParams.contains("ScramLoginModule required"))
+    assert(jaasParams.contains("tokenauth=true"))
+    assert(jaasParams.contains(tokenId))
+    assert(jaasParams.contains(tokenPassword))
+  }
 }
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
index 333572e..f78bdac 100644
--- a/external/kafka-0-10/pom.xml
+++ b/external/kafka-0-10/pom.xml
@@ -36,6 +36,11 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>
+      <artifactId>spark-token-provider-kafka-0-10_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index 07960d1..3e32b59 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.kafka010.KafkaConfigUpdater
 
 /**
  * Choice of how to create and configure underlying Kafka Consumers on driver and executors.
@@ -54,6 +55,15 @@ abstract class ConsumerStrategy[K, V] {
    * checkpoint.
    */
   def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V]
+
+  /**
+   * Updates the parameters with security if needed.
+   * Added a function to hide internals and reduce code duplications because all strategy uses it.
+   */
+  protected def setAuthenticationConfigIfNeeded(kafkaParams: ju.Map[String, Object]) =
+    KafkaConfigUpdater("source", kafkaParams.asScala.toMap)
+      .setAuthenticationConfigIfNeeded()
+      .build()
 }
 
 /**
@@ -78,7 +88,8 @@ private case class Subscribe[K, V](
   def executorKafkaParams: ju.Map[String, Object] = kafkaParams
 
   def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
-    val consumer = new KafkaConsumer[K, V](kafkaParams)
+    val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
+    val consumer = new KafkaConsumer[K, V](updatedKafkaParams)
     consumer.subscribe(topics)
     val toSeek = if (currentOffsets.isEmpty) {
       offsets
@@ -134,7 +145,8 @@ private case class SubscribePattern[K, V](
   def executorKafkaParams: ju.Map[String, Object] = kafkaParams
 
   def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
-    val consumer = new KafkaConsumer[K, V](kafkaParams)
+    val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
+    val consumer = new KafkaConsumer[K, V](updatedKafkaParams)
     consumer.subscribe(pattern, new NoOpConsumerRebalanceListener())
     val toSeek = if (currentOffsets.isEmpty) {
       offsets
@@ -186,7 +198,8 @@ private case class Assign[K, V](
   def executorKafkaParams: ju.Map[String, Object] = kafkaParams
 
   def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
-    val consumer = new KafkaConsumer[K, V](kafkaParams)
+    val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
+    val consumer = new KafkaConsumer[K, V](updatedKafkaParams)
     consumer.assign(topicPartitions)
     val toSeek = if (currentOffsets.isEmpty) {
       offsets
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
index 68c5fe9..142e946 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
@@ -19,11 +19,14 @@ package org.apache.spark.streaming.kafka010
 
 import java.{util => ju}
 
+import scala.collection.JavaConverters._
+
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
+import org.apache.spark.kafka010.KafkaConfigUpdater
 
 private[kafka010] sealed trait KafkaDataConsumer[K, V] {
   /**
@@ -109,7 +112,10 @@ private[kafka010] class InternalKafkaConsumer[K, V](
 
   /** Create a KafkaConsumer to fetch records for `topicPartition` */
   private def createConsumer: KafkaConsumer[K, V] = {
-    val c = new KafkaConsumer[K, V](kafkaParams)
+    val updatedKafkaParams = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap)
+      .setAuthenticationConfigIfNeeded()
+      .build()
+    val c = new KafkaConsumer[K, V](updatedKafkaParams)
     val topics = ju.Arrays.asList(topicPartition)
     c.assign(topics)
     c


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org