You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/04/11 23:40:05 UTC

[spark] branch master updated: [SPARK-27270][SS] Add Kafka dynamic JAAS authentication debug possibility

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 94adffa  [SPARK-27270][SS] Add Kafka dynamic JAAS authentication debug possibility
94adffa is described below

commit 94adffa8b160c0f0317df9675d0a1534e5f804cd
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Thu Apr 11 16:39:40 2019 -0700

    [SPARK-27270][SS] Add Kafka dynamic JAAS authentication debug possibility
    
    ## What changes were proposed in this pull request?
    
    `Krb5LoginModule` supports debug parameter which is not yet supported from Spark side. This configuration makes it easier to debug authentication issues against Kafka.
    
    In this PR `Krb5LoginModule` debug flag controlled by either `sun.security.krb5.debug` or `com.ibm.security.krb5.Krb5Debug`.
    
    Additionally found some hardcoded values like `ssl.truststore.location`, etc... which could be error prone if Kafka changes it so in such cases Kafka define used.
    
    ## How was this patch tested?
    
    Existing + additional unit tests + on cluster.
    
    Closes #24204 from gaborgsomogyi/SPARK-27270.
    
    Authored-by: Gabor Somogyi <ga...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../org/apache/spark/kafka010/KafkaTokenUtil.scala | 24 +++++++++----
 .../spark/kafka010/KafkaTokenUtilSuite.scala       | 40 +++++++++++++---------
 2 files changed, 42 insertions(+), 22 deletions(-)

diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
index e5604f2..e0825e5 100644
--- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
+++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.security.token.{Token, TokenIdentifier}
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions}
-import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
 import org.apache.kafka.common.security.scram.ScramLoginModule
@@ -136,22 +136,22 @@ private[spark] object KafkaTokenUtil extends Logging {
 
   private def setTrustStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = {
     sparkConf.get(Kafka.TRUSTSTORE_LOCATION).foreach { truststoreLocation =>
-      properties.put("ssl.truststore.location", truststoreLocation)
+      properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation)
     }
     sparkConf.get(Kafka.TRUSTSTORE_PASSWORD).foreach { truststorePassword =>
-      properties.put("ssl.truststore.password", truststorePassword)
+      properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword)
     }
   }
 
   private def setKeyStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = {
     sparkConf.get(Kafka.KEYSTORE_LOCATION).foreach { keystoreLocation =>
-      properties.put("ssl.keystore.location", keystoreLocation)
+      properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation)
     }
     sparkConf.get(Kafka.KEYSTORE_PASSWORD).foreach { keystorePassword =>
-      properties.put("ssl.keystore.password", keystorePassword)
+      properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword)
     }
     sparkConf.get(Kafka.KEY_PASSWORD).foreach { keyPassword =>
-      properties.put("ssl.key.password", keyPassword)
+      properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword)
     }
   }
 
@@ -159,6 +159,7 @@ private[spark] object KafkaTokenUtil extends Logging {
     val params =
       s"""
       |${getKrb5LoginModuleName} required
+      | debug=${isGlobalKrbDebugEnabled()}
       | useKeyTab=true
       | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
       | keyTab="${sparkConf.get(KEYTAB).get}"
@@ -175,6 +176,7 @@ private[spark] object KafkaTokenUtil extends Logging {
     val params =
       s"""
       |${getKrb5LoginModuleName} required
+      | debug=${isGlobalKrbDebugEnabled()}
       | useTicketCache=true
       | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}";
       """.stripMargin.replace("\n", "")
@@ -194,6 +196,16 @@ private[spark] object KafkaTokenUtil extends Logging {
     }
   }
 
+  private def isGlobalKrbDebugEnabled(): Boolean = {
+    if (System.getProperty("java.vendor").contains("IBM")) {
+      val debug = System.getenv("com.ibm.security.krb5.Krb5Debug")
+      debug != null && debug.equalsIgnoreCase("all")
+    } else {
+      val debug = System.getenv("sun.security.krb5.debug")
+      debug != null && debug.equalsIgnoreCase("true")
+    }
+  }
+
   private def printToken(token: DelegationToken): Unit = {
     if (log.isDebugEnabled) {
       val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
index 0a5af1d..763f8db 100644
--- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
+++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
@@ -21,7 +21,7 @@ import java.security.PrivilegedExceptionAction
 
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
 import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -83,11 +83,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
       === bootStrapServers)
     assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
       === SASL_PLAINTEXT.name)
-    assert(!adminClientProperties.containsKey("ssl.truststore.location"))
-    assert(!adminClientProperties.containsKey("ssl.truststore.password"))
-    assert(!adminClientProperties.containsKey("ssl.keystore.location"))
-    assert(!adminClientProperties.containsKey("ssl.keystore.password"))
-    assert(!adminClientProperties.containsKey("ssl.key.password"))
+    assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
+    assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
+    assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+    assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
+    assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG))
   }
 
   test("createAdminClientProperties with SASL_SSL protocol should include truststore config") {
@@ -105,11 +105,13 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
       === bootStrapServers)
     assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
       === SASL_SSL.name)
-    assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation)
-    assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword)
-    assert(!adminClientProperties.containsKey("ssl.keystore.location"))
-    assert(!adminClientProperties.containsKey("ssl.keystore.password"))
-    assert(!adminClientProperties.containsKey("ssl.key.password"))
+    assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
+      === trustStoreLocation)
+    assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)
+      === trustStorePassword)
+    assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+    assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
+    assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG))
   }
 
   test("createAdminClientProperties with SSL protocol should include keystore and truststore " +
@@ -128,11 +130,13 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
       === bootStrapServers)
     assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
       === SSL.name)
-    assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation)
-    assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword)
-    assert(adminClientProperties.get("ssl.keystore.location") === keyStoreLocation)
-    assert(adminClientProperties.get("ssl.keystore.password") === keyStorePassword)
-    assert(adminClientProperties.get("ssl.key.password") === keyPassword)
+    assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
+      === trustStoreLocation)
+    assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)
+      === trustStorePassword)
+    assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG) === keyStoreLocation)
+    assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG) === keyStorePassword)
+    assert(adminClientProperties.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG) === keyPassword)
   }
 
   test("createAdminClientProperties with global config should not set dynamic jaas config") {
@@ -165,7 +169,10 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
     assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM))
     val saslJaasConfig = adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
     assert(saslJaasConfig.contains("Krb5LoginModule required"))
+    assert(saslJaasConfig.contains(s"debug="))
     assert(saslJaasConfig.contains("useKeyTab=true"))
+    assert(saslJaasConfig.contains(s"""keyTab="$keytab""""))
+    assert(saslJaasConfig.contains(s"""principal="$principal""""))
   }
 
   test("createAdminClientProperties without keytab should set ticket cache dynamic jaas config") {
@@ -181,6 +188,7 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
     assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM))
     val saslJaasConfig = adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
     assert(saslJaasConfig.contains("Krb5LoginModule required"))
+    assert(saslJaasConfig.contains(s"debug="))
     assert(saslJaasConfig.contains("useTicketCache=true"))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org