You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by GitBox <gi...@apache.org> on 2019/01/15 18:03:27 UTC
[spark] Diff for: [GitHub] asfgit closed pull request #23511:
[SPARK-26592][SS] Throw exception when kafka delegation token tried to
obtain with proxy user
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
index aec0f72feb3c1..f3638533e1b7d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
@@ -17,12 +17,13 @@
package org.apache.spark.deploy.security
-import java.{ util => ju }
+import java.{util => ju}
import java.text.SimpleDateFormat
import scala.util.control.NonFatal
import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
import org.apache.kafka.clients.CommonClientConfigs
@@ -33,6 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, S
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
@@ -45,6 +47,8 @@ private[spark] object KafkaTokenUtil extends Logging {
}
private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = {
+ checkProxyUser()
+
val adminClient = AdminClient.create(createAdminClientProperties(sparkConf))
val createDelegationTokenOptions = new CreateDelegationTokenOptions()
val createResult = adminClient.createDelegationToken(createDelegationTokenOptions)
@@ -59,6 +63,14 @@ private[spark] object KafkaTokenUtil extends Logging {
), token.tokenInfo.expiryTimestamp)
}
+ private[security] def checkProxyUser(): Unit = {
+ val currentUser = UserGroupInformation.getCurrentUser()
+ // Obtaining delegation token for proxy user is planned but not yet implemented
+ // See https://issues.apache.org/jira/browse/KAFKA-6945
+ require(!SparkHadoopUtil.get.isProxyUser(currentUser), "Obtaining delegation token for proxy " +
+ "user is not yet supported.")
+ }
+
private[security] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = {
val adminClientProperties = new ju.Properties
diff --git a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
index 18aa537b3a51d..daa7e544cc9c6 100644
--- a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
@@ -17,9 +17,11 @@
package org.apache.spark.deploy.security
-import java.{ util => ju }
+import java.{util => ju}
+import java.security.PrivilegedExceptionAction
import javax.security.auth.login.{AppConfigurationEntry, Configuration}
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
@@ -78,6 +80,21 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
Configuration.setConfiguration(null)
}
+ test("checkProxyUser with proxy current user should throw exception") {
+ val realUser = UserGroupInformation.createUserForTesting("realUser", Array())
+ UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, Array()).doAs(
+ new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
+ val thrown = intercept[IllegalArgumentException] {
+ KafkaTokenUtil.checkProxyUser()
+ }
+ assert(thrown.getMessage contains
+ "Obtaining delegation token for proxy user is not yet supported.")
+ }
+ }
+ )
+ }
+
test("createAdminClientProperties without bootstrap servers should throw exception") {
val thrown = intercept[IllegalArgumentException] {
KafkaTokenUtil.createAdminClientProperties(sparkConf)
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org