You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/04/11 23:40:05 UTC
[spark] branch master updated: [SPARK-27270][SS] Add Kafka dynamic
JAAS authentication debug possibility
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 94adffa [SPARK-27270][SS] Add Kafka dynamic JAAS authentication debug possibility
94adffa is described below
commit 94adffa8b160c0f0317df9675d0a1534e5f804cd
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Thu Apr 11 16:39:40 2019 -0700
[SPARK-27270][SS] Add Kafka dynamic JAAS authentication debug possibility
## What changes were proposed in this pull request?
`Krb5LoginModule` supports debug parameter which is not yet supported from Spark side. This configuration makes it easier to debug authentication issues against Kafka.
In this PR `Krb5LoginModule` debug flag controlled by either `sun.security.krb5.debug` or `com.ibm.security.krb5.Krb5Debug`.
Additionally found some hardcoded values like `ssl.truststore.location`, etc... which could be error prone if Kafka changes it so in such cases Kafka define used.
## How was this patch tested?
Existing + additional unit tests + on cluster.
Closes #24204 from gaborgsomogyi/SPARK-27270.
Authored-by: Gabor Somogyi <ga...@gmail.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
.../org/apache/spark/kafka010/KafkaTokenUtil.scala | 24 +++++++++----
.../spark/kafka010/KafkaTokenUtilSuite.scala | 40 +++++++++++++---------
2 files changed, 42 insertions(+), 22 deletions(-)
diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
index e5604f2..e0825e5 100644
--- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
+++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions}
-import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
import org.apache.kafka.common.security.scram.ScramLoginModule
@@ -136,22 +136,22 @@ private[spark] object KafkaTokenUtil extends Logging {
private def setTrustStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = {
sparkConf.get(Kafka.TRUSTSTORE_LOCATION).foreach { truststoreLocation =>
- properties.put("ssl.truststore.location", truststoreLocation)
+ properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation)
}
sparkConf.get(Kafka.TRUSTSTORE_PASSWORD).foreach { truststorePassword =>
- properties.put("ssl.truststore.password", truststorePassword)
+ properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword)
}
}
private def setKeyStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = {
sparkConf.get(Kafka.KEYSTORE_LOCATION).foreach { keystoreLocation =>
- properties.put("ssl.keystore.location", keystoreLocation)
+ properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation)
}
sparkConf.get(Kafka.KEYSTORE_PASSWORD).foreach { keystorePassword =>
- properties.put("ssl.keystore.password", keystorePassword)
+ properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword)
}
sparkConf.get(Kafka.KEY_PASSWORD).foreach { keyPassword =>
- properties.put("ssl.key.password", keyPassword)
+ properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword)
}
}
@@ -159,6 +159,7 @@ private[spark] object KafkaTokenUtil extends Logging {
val params =
s"""
|${getKrb5LoginModuleName} required
+ | debug=${isGlobalKrbDebugEnabled()}
| useKeyTab=true
| serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
| keyTab="${sparkConf.get(KEYTAB).get}"
@@ -175,6 +176,7 @@ private[spark] object KafkaTokenUtil extends Logging {
val params =
s"""
|${getKrb5LoginModuleName} required
+ | debug=${isGlobalKrbDebugEnabled()}
| useTicketCache=true
| serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}";
""".stripMargin.replace("\n", "")
@@ -194,6 +196,16 @@ private[spark] object KafkaTokenUtil extends Logging {
}
}
+ private def isGlobalKrbDebugEnabled(): Boolean = {
+ if (System.getProperty("java.vendor").contains("IBM")) {
+ val debug = System.getenv("com.ibm.security.krb5.Krb5Debug")
+ debug != null && debug.equalsIgnoreCase("all")
+ } else {
+ val debug = System.getenv("sun.security.krb5.debug")
+ debug != null && debug.equalsIgnoreCase("true")
+ }
+ }
+
private def printToken(token: DelegationToken): Unit = {
if (log.isDebugEnabled) {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
index 0a5af1d..763f8db 100644
--- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
+++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
@@ -21,7 +21,7 @@ import java.security.PrivilegedExceptionAction
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -83,11 +83,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
=== bootStrapServers)
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
=== SASL_PLAINTEXT.name)
- assert(!adminClientProperties.containsKey("ssl.truststore.location"))
- assert(!adminClientProperties.containsKey("ssl.truststore.password"))
- assert(!adminClientProperties.containsKey("ssl.keystore.location"))
- assert(!adminClientProperties.containsKey("ssl.keystore.password"))
- assert(!adminClientProperties.containsKey("ssl.key.password"))
+ assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
+ assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
+ assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
+ assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG))
}
test("createAdminClientProperties with SASL_SSL protocol should include truststore config") {
@@ -105,11 +105,13 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
=== bootStrapServers)
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
=== SASL_SSL.name)
- assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation)
- assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword)
- assert(!adminClientProperties.containsKey("ssl.keystore.location"))
- assert(!adminClientProperties.containsKey("ssl.keystore.password"))
- assert(!adminClientProperties.containsKey("ssl.key.password"))
+ assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
+ === trustStoreLocation)
+ assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)
+ === trustStorePassword)
+ assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
+ assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG))
}
test("createAdminClientProperties with SSL protocol should include keystore and truststore " +
@@ -128,11 +130,13 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
=== bootStrapServers)
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
=== SSL.name)
- assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation)
- assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword)
- assert(adminClientProperties.get("ssl.keystore.location") === keyStoreLocation)
- assert(adminClientProperties.get("ssl.keystore.password") === keyStorePassword)
- assert(adminClientProperties.get("ssl.key.password") === keyPassword)
+ assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
+ === trustStoreLocation)
+ assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)
+ === trustStorePassword)
+ assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG) === keyStoreLocation)
+ assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG) === keyStorePassword)
+ assert(adminClientProperties.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG) === keyPassword)
}
test("createAdminClientProperties with global config should not set dynamic jaas config") {
@@ -165,7 +169,10 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM))
val saslJaasConfig = adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
assert(saslJaasConfig.contains("Krb5LoginModule required"))
+ assert(saslJaasConfig.contains(s"debug="))
assert(saslJaasConfig.contains("useKeyTab=true"))
+ assert(saslJaasConfig.contains(s"""keyTab="$keytab""""))
+ assert(saslJaasConfig.contains(s"""principal="$principal""""))
}
test("createAdminClientProperties without keytab should set ticket cache dynamic jaas config") {
@@ -181,6 +188,7 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM))
val saslJaasConfig = adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
assert(saslJaasConfig.contains("Krb5LoginModule required"))
+ assert(saslJaasConfig.contains(s"debug="))
assert(saslJaasConfig.contains("useTicketCache=true"))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org