You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/12/17 18:07:46 UTC

[spark] branch master updated: [SPARK-26371][SS] Increase kafka ConfigUpdater test coverage.

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a116e6  [SPARK-26371][SS] Increase kafka ConfigUpdater test coverage.
5a116e6 is described below

commit 5a116e669cb196f59ab3f8d06477f675cd0400f9
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Mon Dec 17 10:07:35 2018 -0800

    [SPARK-26371][SS] Increase kafka ConfigUpdater test coverage.
    
    ## What changes were proposed in this pull request?
    
    As Kafka delegation token added logic into ConfigUpdater it would be good to test it.
    This PR contains the following changes:
    * ConfigUpdater extracted to a separate file and renamed to KafkaConfigUpdater
    * mockito-core dependency added to kafka-0-10-sql
    * Unit tests added
    
    ## How was this patch tested?
    
    Existing + new unit tests + on cluster.
    
    Closes #23321 from gaborgsomogyi/SPARK-26371.
    
    Authored-by: Gabor Somogyi <ga...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 external/kafka-0-10-sql/pom.xml                    |   5 +
 .../spark/sql/kafka010/KafkaConfigUpdater.scala    |  74 ++++++++++++++
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |  52 +---------
 .../sql/kafka010/KafkaConfigUpdaterSuite.scala     | 113 +++++++++++++++++++++
 ...rSuite.scala => KafkaDelegationTokenTest.scala} |  77 +++++++-------
 .../sql/kafka010/KafkaSecurityHelperSuite.scala    |  46 +--------
 6 files changed, 238 insertions(+), 129 deletions(-)

diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml
index de8731c..1c77906 100644
--- a/external/kafka-0-10-sql/pom.xml
+++ b/external/kafka-0-10-sql/pom.xml
@@ -107,6 +107,11 @@
         <scope>test</scope>
       </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.scalacheck</groupId>
       <artifactId>scalacheck_${scala.binary.version}</artifactId>
       <scope>test</scope>
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
new file mode 100644
index 0000000..bc1b801
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.common.config.SaslConfigs
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Kafka
+
+/**
+ * Class to conveniently update Kafka config params, while logging the changes
+ */
+private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, String])
+    extends Logging {
+  private val map = new ju.HashMap[String, Object](kafkaParams.asJava)
+
+  def set(key: String, value: Object): this.type = {
+    map.put(key, value)
+    logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}")
+    this
+  }
+
+  def setIfUnset(key: String, value: Object): this.type = {
+    if (!map.containsKey(key)) {
+      map.put(key, value)
+      logDebug(s"$module: Set $key to $value")
+    }
+    this
+  }
+
+  def setAuthenticationConfigIfNeeded(): this.type = {
+    // There are multiple possibilities to log in and applied in the following order:
+    // - JVM global security provided -> try to log in with JVM global security configuration
+    //   which can be configured for example with 'java.security.auth.login.config'.
+    //   For this no additional parameter needed.
+    // - Token is provided -> try to log in with scram module using kafka's dynamic JAAS
+    //   configuration.
+    if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
+      logDebug("JVM global security configuration detected, using it for login.")
+    } else if (KafkaSecurityHelper.isTokenAvailable()) {
+      logDebug("Delegation token detected, using it for login.")
+      val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
+      set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+      val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
+      require(mechanism.startsWith("SCRAM"),
+        "Delegation token works only with SCRAM mechanism.")
+      set(SaslConfigs.SASL_MECHANISM, mechanism)
+    }
+    this
+  }
+
+  def build(): ju.Map[String, Object] = map
+}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 4b8b5c0..5774ee7 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -24,13 +24,9 @@ import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
 
-import org.apache.spark.SparkEnv
-import org.apache.spark.deploy.security.KafkaTokenUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
@@ -483,7 +479,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
   }
 
   def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]): ju.Map[String, Object] =
-    ConfigUpdater("source", specifiedKafkaParams)
+    KafkaConfigUpdater("source", specifiedKafkaParams)
       .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
       .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)
 
@@ -506,7 +502,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
   def kafkaParamsForExecutors(
       specifiedKafkaParams: Map[String, String],
       uniqueGroupId: String): ju.Map[String, Object] =
-    ConfigUpdater("executor", specifiedKafkaParams)
+    KafkaConfigUpdater("executor", specifiedKafkaParams)
       .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
       .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)
 
@@ -537,48 +533,6 @@ private[kafka010] object KafkaSourceProvider extends Logging {
     s"${groupIdPrefix}-${UUID.randomUUID}-${metadataPath.hashCode}"
   }
 
-  /** Class to conveniently update Kafka config params, while logging the changes */
-  private case class ConfigUpdater(module: String, kafkaParams: Map[String, String]) {
-    private val map = new ju.HashMap[String, Object](kafkaParams.asJava)
-
-    def set(key: String, value: Object): this.type = {
-      map.put(key, value)
-      logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}")
-      this
-    }
-
-    def setIfUnset(key: String, value: Object): ConfigUpdater = {
-      if (!map.containsKey(key)) {
-        map.put(key, value)
-        logDebug(s"$module: Set $key to $value")
-      }
-      this
-    }
-
-    def setAuthenticationConfigIfNeeded(): ConfigUpdater = {
-      // There are multiple possibilities to log in and applied in the following order:
-      // - JVM global security provided -> try to log in with JVM global security configuration
-      //   which can be configured for example with 'java.security.auth.login.config'.
-      //   For this no additional parameter needed.
-      // - Token is provided -> try to log in with scram module using kafka's dynamic JAAS
-      //   configuration.
-      if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
-        logDebug("JVM global security configuration detected, using it for login.")
-      } else if (KafkaSecurityHelper.isTokenAvailable()) {
-        logDebug("Delegation token detected, using it for login.")
-        val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
-        set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
-        val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
-        require(mechanism.startsWith("SCRAM"),
-          "Delegation token works only with SCRAM mechanism.")
-        set(SaslConfigs.SASL_MECHANISM, mechanism)
-      }
-      this
-    }
-
-    def build(): ju.Map[String, Object] = map
-  }
-
   private[kafka010] def kafkaParamsForProducer(
       parameters: Map[String, String]): ju.Map[String, Object] = {
     val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
@@ -596,7 +550,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
 
     val specifiedKafkaParams = convertToSpecifiedParams(parameters)
 
-    ConfigUpdater("executor", specifiedKafkaParams)
+    KafkaConfigUpdater("executor", specifiedKafkaParams)
       .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
       .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
       .setAuthenticationConfigIfNeeded()
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala
new file mode 100644
index 0000000..25ccca3
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.config.SaslConfigs
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.internal.config._
+
+class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTest {
+  private val testModule = "testModule"
+  private val testKey = "testKey"
+  private val testValue = "testValue"
+  private val otherTestValue = "otherTestValue"
+
+  test("set should always set value") {
+    val params = Map.empty[String, String]
+
+    val updatedParams = KafkaConfigUpdater(testModule, params)
+      .set(testKey, testValue)
+      .build()
+
+    assert(updatedParams.size() === 1)
+    assert(updatedParams.get(testKey) === testValue)
+  }
+
+  test("setIfUnset without existing key should set value") {
+    val params = Map.empty[String, String]
+
+    val updatedParams = KafkaConfigUpdater(testModule, params)
+      .setIfUnset(testKey, testValue)
+      .build()
+
+    assert(updatedParams.size() === 1)
+    assert(updatedParams.get(testKey) === testValue)
+  }
+
+  test("setIfUnset with existing key should not set value") {
+    val params = Map[String, String](testKey -> testValue)
+
+    val updatedParams = KafkaConfigUpdater(testModule, params)
+      .setIfUnset(testKey, otherTestValue)
+      .build()
+
+    assert(updatedParams.size() === 1)
+    assert(updatedParams.get(testKey) === testValue)
+  }
+
+  test("setAuthenticationConfigIfNeeded with global security should not set values") {
+    val params = Map.empty[String, String]
+    setGlobalKafkaClientConfig()
+
+    val updatedParams = KafkaConfigUpdater(testModule, params)
+      .setAuthenticationConfigIfNeeded()
+      .build()
+
+    assert(updatedParams.size() === 0)
+  }
+
+  test("setAuthenticationConfigIfNeeded with token should set values") {
+    val params = Map.empty[String, String]
+    setSparkEnv(Map.empty)
+    addTokenToUGI()
+
+    val updatedParams = KafkaConfigUpdater(testModule, params)
+      .setAuthenticationConfigIfNeeded()
+      .build()
+
+    assert(updatedParams.size() === 2)
+    assert(updatedParams.containsKey(SaslConfigs.SASL_JAAS_CONFIG))
+    assert(updatedParams.get(SaslConfigs.SASL_MECHANISM) ===
+      Kafka.TOKEN_SASL_MECHANISM.defaultValueString)
+  }
+
+  test("setAuthenticationConfigIfNeeded with token and invalid mechanism should throw exception") {
+    val params = Map.empty[String, String]
+    setSparkEnv(Map[String, String](Kafka.TOKEN_SASL_MECHANISM.key -> "INVALID"))
+    addTokenToUGI()
+
+    val e = intercept[IllegalArgumentException] {
+      KafkaConfigUpdater(testModule, params)
+        .setAuthenticationConfigIfNeeded()
+        .build()
+    }
+
+    assert(e.getMessage.contains("Delegation token works only with SCRAM mechanism."))
+  }
+
+  test("setAuthenticationConfigIfNeeded without security should not set values") {
+    val params = Map.empty[String, String]
+
+    val updatedParams = KafkaConfigUpdater(testModule, params)
+      .setAuthenticationConfigIfNeeded()
+      .build()
+
+    assert(updatedParams.size() === 0)
+  }
+}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala
similarity index 51%
copy from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
copy to external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala
index fd9dee3..1899c65 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala
@@ -17,36 +17,59 @@
 
 package org.apache.spark.sql.kafka010
 
-import java.util.UUID
+import java.{util => ju}
+import javax.security.auth.login.{AppConfigurationEntry, Configuration}
 
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.security.token.Token
+import org.mockito.Mockito.{doReturn, mock}
 import org.scalatest.BeforeAndAfterEach
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
 import org.apache.spark.deploy.security.KafkaTokenUtil
 import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
 
-class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
-  private val tokenId = "tokenId" + UUID.randomUUID().toString
-  private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
-
-  private var sparkConf: SparkConf = null
-
-  override def beforeEach(): Unit = {
-    super.beforeEach()
-    sparkConf = new SparkConf()
+/**
+ * This is a trait which provides functionalities for Kafka delegation token related test suites.
+ */
+trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
+  self: SparkFunSuite =>
+
+  protected val tokenId = "tokenId" + ju.UUID.randomUUID().toString
+  protected val tokenPassword = "tokenPassword" + ju.UUID.randomUUID().toString
+
+  private class KafkaJaasConfiguration extends Configuration {
+    val entry =
+      new AppConfigurationEntry(
+        "DummyModule",
+        AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+        ju.Collections.emptyMap[String, Object]()
+      )
+
+    override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
+      if (name.equals("KafkaClient")) {
+        Array(entry)
+      } else {
+        null
+      }
+    }
   }
 
   override def afterEach(): Unit = {
     try {
-      resetUGI
+      Configuration.setConfiguration(null)
+      UserGroupInformation.setLoginUser(null)
+      SparkEnv.set(null)
     } finally {
       super.afterEach()
     }
   }
 
-  private def addTokenToUGI(): Unit = {
+  protected def setGlobalKafkaClientConfig(): Unit = {
+    Configuration.setConfiguration(new KafkaJaasConfiguration)
+  }
+
+  protected def addTokenToUGI(): Unit = {
     val token = new Token[KafkaDelegationTokenIdentifier](
       tokenId.getBytes,
       tokenPassword.getBytes,
@@ -58,28 +81,10 @@ class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
     UserGroupInformation.getCurrentUser.addCredentials(creds)
   }
 
-  private def resetUGI: Unit = {
-    UserGroupInformation.setLoginUser(null)
-  }
-
-  test("isTokenAvailable without token should return false") {
-    assert(!KafkaSecurityHelper.isTokenAvailable())
-  }
-
-  test("isTokenAvailable with token should return true") {
-    addTokenToUGI()
-
-    assert(KafkaSecurityHelper.isTokenAvailable())
-  }
-
-  test("getTokenJaasParams with token should return scram module") {
-    addTokenToUGI()
-
-    val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)
-
-    assert(jaasParams.contains("ScramLoginModule required"))
-    assert(jaasParams.contains("tokenauth=true"))
-    assert(jaasParams.contains(tokenId))
-    assert(jaasParams.contains(tokenPassword))
+  protected def setSparkEnv(settings: Traversable[(String, String)]): Unit = {
+    val conf = new SparkConf().setAll(settings)
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
   }
 }
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
index fd9dee3..d908bbf 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
@@ -17,51 +17,9 @@
 
 package org.apache.spark.sql.kafka010
 
-import java.util.UUID
-
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-import org.apache.hadoop.security.token.Token
-import org.scalatest.BeforeAndAfterEach
-
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.security.KafkaTokenUtil
-import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
-
-class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
-  private val tokenId = "tokenId" + UUID.randomUUID().toString
-  private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
-
-  private var sparkConf: SparkConf = null
-
-  override def beforeEach(): Unit = {
-    super.beforeEach()
-    sparkConf = new SparkConf()
-  }
-
-  override def afterEach(): Unit = {
-    try {
-      resetUGI
-    } finally {
-      super.afterEach()
-    }
-  }
-
-  private def addTokenToUGI(): Unit = {
-    val token = new Token[KafkaDelegationTokenIdentifier](
-      tokenId.getBytes,
-      tokenPassword.getBytes,
-      KafkaTokenUtil.TOKEN_KIND,
-      KafkaTokenUtil.TOKEN_SERVICE
-    )
-    val creds = new Credentials()
-    creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token)
-    UserGroupInformation.getCurrentUser.addCredentials(creds)
-  }
-
-  private def resetUGI: Unit = {
-    UserGroupInformation.setLoginUser(null)
-  }
 
+class KafkaSecurityHelperSuite extends SparkFunSuite with KafkaDelegationTokenTest {
   test("isTokenAvailable without token should return false") {
     assert(!KafkaSecurityHelper.isTokenAvailable())
   }
@@ -75,7 +33,7 @@ class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
   test("getTokenJaasParams with token should return scram module") {
     addTokenToUGI()
 
-    val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)
+    val jaasParams = KafkaSecurityHelper.getTokenJaasParams(new SparkConf())
 
     assert(jaasParams.contains("ScramLoginModule required"))
     assert(jaasParams.contains("tokenauth=true"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org