You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/12/17 18:07:46 UTC
[spark] branch master updated: [SPARK-26371][SS] Increase kafka
ConfigUpdater test coverage.
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5a116e6 [SPARK-26371][SS] Increase kafka ConfigUpdater test coverage.
5a116e6 is described below
commit 5a116e669cb196f59ab3f8d06477f675cd0400f9
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Mon Dec 17 10:07:35 2018 -0800
[SPARK-26371][SS] Increase kafka ConfigUpdater test coverage.
## What changes were proposed in this pull request?
As Kafka delegation token added logic into ConfigUpdater it would be good to test it.
This PR contains the following changes:
* ConfigUpdater extracted to a separate file and renamed to KafkaConfigUpdater
* mockito-core dependency added to kafka-0-10-sql
* Unit tests added
## How was this patch tested?
Existing + new unit tests + on cluster.
Closes #23321 from gaborgsomogyi/SPARK-26371.
Authored-by: Gabor Somogyi <ga...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
external/kafka-0-10-sql/pom.xml | 5 +
.../spark/sql/kafka010/KafkaConfigUpdater.scala | 74 ++++++++++++++
.../spark/sql/kafka010/KafkaSourceProvider.scala | 52 +---------
.../sql/kafka010/KafkaConfigUpdaterSuite.scala | 113 +++++++++++++++++++++
...rSuite.scala => KafkaDelegationTokenTest.scala} | 77 +++++++-------
.../sql/kafka010/KafkaSecurityHelperSuite.scala | 46 +--------
6 files changed, 238 insertions(+), 129 deletions(-)
diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml
index de8731c..1c77906 100644
--- a/external/kafka-0-10-sql/pom.xml
+++ b/external/kafka-0-10-sql/pom.xml
@@ -107,6 +107,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
new file mode 100644
index 0000000..bc1b801
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.common.config.SaslConfigs
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Kafka
+
+/**
+ * Class to conveniently update Kafka config params, while logging the changes
+ */
+private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, String])
+ extends Logging {
+ private val map = new ju.HashMap[String, Object](kafkaParams.asJava)
+
+ def set(key: String, value: Object): this.type = {
+ map.put(key, value)
+ logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}")
+ this
+ }
+
+ def setIfUnset(key: String, value: Object): this.type = {
+ if (!map.containsKey(key)) {
+ map.put(key, value)
+ logDebug(s"$module: Set $key to $value")
+ }
+ this
+ }
+
+ def setAuthenticationConfigIfNeeded(): this.type = {
+ // There are multiple possibilities to log in and applied in the following order:
+ // - JVM global security provided -> try to log in with JVM global security configuration
+ // which can be configured for example with 'java.security.auth.login.config'.
+ // For this no additional parameter needed.
+ // - Token is provided -> try to log in with scram module using kafka's dynamic JAAS
+ // configuration.
+ if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
+ logDebug("JVM global security configuration detected, using it for login.")
+ } else if (KafkaSecurityHelper.isTokenAvailable()) {
+ logDebug("Delegation token detected, using it for login.")
+ val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
+ set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+ val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
+ require(mechanism.startsWith("SCRAM"),
+ "Delegation token works only with SCRAM mechanism.")
+ set(SaslConfigs.SASL_MECHANISM, mechanism)
+ }
+ this
+ }
+
+ def build(): ju.Map[String, Object] = map
+}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 4b8b5c0..5774ee7 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -24,13 +24,9 @@ import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
-import org.apache.spark.SparkEnv
-import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
@@ -483,7 +479,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
}
def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]): ju.Map[String, Object] =
- ConfigUpdater("source", specifiedKafkaParams)
+ KafkaConfigUpdater("source", specifiedKafkaParams)
.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)
@@ -506,7 +502,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
def kafkaParamsForExecutors(
specifiedKafkaParams: Map[String, String],
uniqueGroupId: String): ju.Map[String, Object] =
- ConfigUpdater("executor", specifiedKafkaParams)
+ KafkaConfigUpdater("executor", specifiedKafkaParams)
.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)
@@ -537,48 +533,6 @@ private[kafka010] object KafkaSourceProvider extends Logging {
s"${groupIdPrefix}-${UUID.randomUUID}-${metadataPath.hashCode}"
}
- /** Class to conveniently update Kafka config params, while logging the changes */
- private case class ConfigUpdater(module: String, kafkaParams: Map[String, String]) {
- private val map = new ju.HashMap[String, Object](kafkaParams.asJava)
-
- def set(key: String, value: Object): this.type = {
- map.put(key, value)
- logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}")
- this
- }
-
- def setIfUnset(key: String, value: Object): ConfigUpdater = {
- if (!map.containsKey(key)) {
- map.put(key, value)
- logDebug(s"$module: Set $key to $value")
- }
- this
- }
-
- def setAuthenticationConfigIfNeeded(): ConfigUpdater = {
- // There are multiple possibilities to log in and applied in the following order:
- // - JVM global security provided -> try to log in with JVM global security configuration
- // which can be configured for example with 'java.security.auth.login.config'.
- // For this no additional parameter needed.
- // - Token is provided -> try to log in with scram module using kafka's dynamic JAAS
- // configuration.
- if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
- logDebug("JVM global security configuration detected, using it for login.")
- } else if (KafkaSecurityHelper.isTokenAvailable()) {
- logDebug("Delegation token detected, using it for login.")
- val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
- set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
- val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
- require(mechanism.startsWith("SCRAM"),
- "Delegation token works only with SCRAM mechanism.")
- set(SaslConfigs.SASL_MECHANISM, mechanism)
- }
- this
- }
-
- def build(): ju.Map[String, Object] = map
- }
-
private[kafka010] def kafkaParamsForProducer(
parameters: Map[String, String]): ju.Map[String, Object] = {
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
@@ -596,7 +550,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
val specifiedKafkaParams = convertToSpecifiedParams(parameters)
- ConfigUpdater("executor", specifiedKafkaParams)
+ KafkaConfigUpdater("executor", specifiedKafkaParams)
.set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
.set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
.setAuthenticationConfigIfNeeded()
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala
new file mode 100644
index 0000000..25ccca3
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.config.SaslConfigs
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.internal.config._
+
+class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTest {
+ private val testModule = "testModule"
+ private val testKey = "testKey"
+ private val testValue = "testValue"
+ private val otherTestValue = "otherTestValue"
+
+ test("set should always set value") {
+ val params = Map.empty[String, String]
+
+ val updatedParams = KafkaConfigUpdater(testModule, params)
+ .set(testKey, testValue)
+ .build()
+
+ assert(updatedParams.size() === 1)
+ assert(updatedParams.get(testKey) === testValue)
+ }
+
+ test("setIfUnset without existing key should set value") {
+ val params = Map.empty[String, String]
+
+ val updatedParams = KafkaConfigUpdater(testModule, params)
+ .setIfUnset(testKey, testValue)
+ .build()
+
+ assert(updatedParams.size() === 1)
+ assert(updatedParams.get(testKey) === testValue)
+ }
+
+ test("setIfUnset with existing key should not set value") {
+ val params = Map[String, String](testKey -> testValue)
+
+ val updatedParams = KafkaConfigUpdater(testModule, params)
+ .setIfUnset(testKey, otherTestValue)
+ .build()
+
+ assert(updatedParams.size() === 1)
+ assert(updatedParams.get(testKey) === testValue)
+ }
+
+ test("setAuthenticationConfigIfNeeded with global security should not set values") {
+ val params = Map.empty[String, String]
+ setGlobalKafkaClientConfig()
+
+ val updatedParams = KafkaConfigUpdater(testModule, params)
+ .setAuthenticationConfigIfNeeded()
+ .build()
+
+ assert(updatedParams.size() === 0)
+ }
+
+ test("setAuthenticationConfigIfNeeded with token should set values") {
+ val params = Map.empty[String, String]
+ setSparkEnv(Map.empty)
+ addTokenToUGI()
+
+ val updatedParams = KafkaConfigUpdater(testModule, params)
+ .setAuthenticationConfigIfNeeded()
+ .build()
+
+ assert(updatedParams.size() === 2)
+ assert(updatedParams.containsKey(SaslConfigs.SASL_JAAS_CONFIG))
+ assert(updatedParams.get(SaslConfigs.SASL_MECHANISM) ===
+ Kafka.TOKEN_SASL_MECHANISM.defaultValueString)
+ }
+
+ test("setAuthenticationConfigIfNeeded with token and invalid mechanism should throw exception") {
+ val params = Map.empty[String, String]
+ setSparkEnv(Map[String, String](Kafka.TOKEN_SASL_MECHANISM.key -> "INVALID"))
+ addTokenToUGI()
+
+ val e = intercept[IllegalArgumentException] {
+ KafkaConfigUpdater(testModule, params)
+ .setAuthenticationConfigIfNeeded()
+ .build()
+ }
+
+ assert(e.getMessage.contains("Delegation token works only with SCRAM mechanism."))
+ }
+
+ test("setAuthenticationConfigIfNeeded without security should not set values") {
+ val params = Map.empty[String, String]
+
+ val updatedParams = KafkaConfigUpdater(testModule, params)
+ .setAuthenticationConfigIfNeeded()
+ .build()
+
+ assert(updatedParams.size() === 0)
+ }
+}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala
similarity index 51%
copy from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
copy to external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala
index fd9dee3..1899c65 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala
@@ -17,36 +17,59 @@
package org.apache.spark.sql.kafka010
-import java.util.UUID
+import java.{util => ju}
+import javax.security.auth.login.{AppConfigurationEntry, Configuration}
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.Token
+import org.mockito.Mockito.{doReturn, mock}
import org.scalatest.BeforeAndAfterEach
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
-class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
- private val tokenId = "tokenId" + UUID.randomUUID().toString
- private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
-
- private var sparkConf: SparkConf = null
-
- override def beforeEach(): Unit = {
- super.beforeEach()
- sparkConf = new SparkConf()
+/**
+ * This is a trait which provides functionalities for Kafka delegation token related test suites.
+ */
+trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
+ self: SparkFunSuite =>
+
+ protected val tokenId = "tokenId" + ju.UUID.randomUUID().toString
+ protected val tokenPassword = "tokenPassword" + ju.UUID.randomUUID().toString
+
+ private class KafkaJaasConfiguration extends Configuration {
+ val entry =
+ new AppConfigurationEntry(
+ "DummyModule",
+ AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+ ju.Collections.emptyMap[String, Object]()
+ )
+
+ override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
+ if (name.equals("KafkaClient")) {
+ Array(entry)
+ } else {
+ null
+ }
+ }
}
override def afterEach(): Unit = {
try {
- resetUGI
+ Configuration.setConfiguration(null)
+ UserGroupInformation.setLoginUser(null)
+ SparkEnv.set(null)
} finally {
super.afterEach()
}
}
- private def addTokenToUGI(): Unit = {
+ protected def setGlobalKafkaClientConfig(): Unit = {
+ Configuration.setConfiguration(new KafkaJaasConfiguration)
+ }
+
+ protected def addTokenToUGI(): Unit = {
val token = new Token[KafkaDelegationTokenIdentifier](
tokenId.getBytes,
tokenPassword.getBytes,
@@ -58,28 +81,10 @@ class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
UserGroupInformation.getCurrentUser.addCredentials(creds)
}
- private def resetUGI: Unit = {
- UserGroupInformation.setLoginUser(null)
- }
-
- test("isTokenAvailable without token should return false") {
- assert(!KafkaSecurityHelper.isTokenAvailable())
- }
-
- test("isTokenAvailable with token should return true") {
- addTokenToUGI()
-
- assert(KafkaSecurityHelper.isTokenAvailable())
- }
-
- test("getTokenJaasParams with token should return scram module") {
- addTokenToUGI()
-
- val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)
-
- assert(jaasParams.contains("ScramLoginModule required"))
- assert(jaasParams.contains("tokenauth=true"))
- assert(jaasParams.contains(tokenId))
- assert(jaasParams.contains(tokenPassword))
+ protected def setSparkEnv(settings: Traversable[(String, String)]): Unit = {
+ val conf = new SparkConf().setAll(settings)
+ val env = mock(classOf[SparkEnv])
+ doReturn(conf).when(env).conf
+ SparkEnv.set(env)
}
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
index fd9dee3..d908bbf 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
@@ -17,51 +17,9 @@
package org.apache.spark.sql.kafka010
-import java.util.UUID
-
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-import org.apache.hadoop.security.token.Token
-import org.scalatest.BeforeAndAfterEach
-
import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.security.KafkaTokenUtil
-import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
-
-class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
- private val tokenId = "tokenId" + UUID.randomUUID().toString
- private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
-
- private var sparkConf: SparkConf = null
-
- override def beforeEach(): Unit = {
- super.beforeEach()
- sparkConf = new SparkConf()
- }
-
- override def afterEach(): Unit = {
- try {
- resetUGI
- } finally {
- super.afterEach()
- }
- }
-
- private def addTokenToUGI(): Unit = {
- val token = new Token[KafkaDelegationTokenIdentifier](
- tokenId.getBytes,
- tokenPassword.getBytes,
- KafkaTokenUtil.TOKEN_KIND,
- KafkaTokenUtil.TOKEN_SERVICE
- )
- val creds = new Credentials()
- creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token)
- UserGroupInformation.getCurrentUser.addCredentials(creds)
- }
-
- private def resetUGI: Unit = {
- UserGroupInformation.setLoginUser(null)
- }
+class KafkaSecurityHelperSuite extends SparkFunSuite with KafkaDelegationTokenTest {
test("isTokenAvailable without token should return false") {
assert(!KafkaSecurityHelper.isTokenAvailable())
}
@@ -75,7 +33,7 @@ class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
test("getTokenJaasParams with token should return scram module") {
addTokenToUGI()
- val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)
+ val jaasParams = KafkaSecurityHelper.getTokenJaasParams(new SparkConf())
assert(jaasParams.contains("ScramLoginModule required"))
assert(jaasParams.contains("tokenauth=true"))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org