You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by GitBox <gi...@apache.org> on 2019/01/15 18:03:27 UTC

[spark] Diff for: [GitHub] asfgit closed pull request #23511: [SPARK-26592][SS] Throw exception when kafka delegation token tried to obtain with proxy user

diff --git a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
index aec0f72feb3c1..f3638533e1b7d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.deploy.security
 
-import java.{ util => ju }
+import java.{util => ju}
 import java.text.SimpleDateFormat
 
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.security.token.{Token, TokenIdentifier}
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
 import org.apache.kafka.clients.CommonClientConfigs
@@ -33,6 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, S
 import org.apache.kafka.common.security.token.delegation.DelegationToken
 
 import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 
@@ -45,6 +47,8 @@ private[spark] object KafkaTokenUtil extends Logging {
   }
 
   private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = {
+    checkProxyUser()
+
     val adminClient = AdminClient.create(createAdminClientProperties(sparkConf))
     val createDelegationTokenOptions = new CreateDelegationTokenOptions()
     val createResult = adminClient.createDelegationToken(createDelegationTokenOptions)
@@ -59,6 +63,14 @@ private[spark] object KafkaTokenUtil extends Logging {
     ), token.tokenInfo.expiryTimestamp)
   }
 
+  private[security] def checkProxyUser(): Unit = {
+    val currentUser = UserGroupInformation.getCurrentUser()
+    // Obtaining delegation token for proxy user is planned but not yet implemented
+    // See https://issues.apache.org/jira/browse/KAFKA-6945
+    require(!SparkHadoopUtil.get.isProxyUser(currentUser), "Obtaining delegation token for proxy " +
+      "user is not yet supported.")
+  }
+
   private[security] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = {
     val adminClientProperties = new ju.Properties
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
index 18aa537b3a51d..daa7e544cc9c6 100644
--- a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.deploy.security
 
-import java.{ util => ju }
+import java.{util => ju}
+import java.security.PrivilegedExceptionAction
 import javax.security.auth.login.{AppConfigurationEntry, Configuration}
 
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
@@ -78,6 +80,21 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
     Configuration.setConfiguration(null)
   }
 
+  test("checkProxyUser with proxy current user should throw exception") {
+    val realUser = UserGroupInformation.createUserForTesting("realUser", Array())
+    UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, Array()).doAs(
+      new PrivilegedExceptionAction[Unit]() {
+        override def run(): Unit = {
+          val thrown = intercept[IllegalArgumentException] {
+            KafkaTokenUtil.checkProxyUser()
+          }
+          assert(thrown.getMessage contains
+            "Obtaining delegation token for proxy user is not yet supported.")
+        }
+      }
+    )
+  }
+
   test("createAdminClientProperties without bootstrap servers should throw exception") {
     val thrown = intercept[IllegalArgumentException] {
       KafkaTokenUtil.createAdminClientProperties(sparkConf)


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org