You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/07/11 16:36:57 UTC

[spark] branch master updated: [SPARK-28055][SS][DSTREAMS] Add delegation token custom AdminClient configurations.

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d47c219  [SPARK-28055][SS][DSTREAMS] Add delegation token custom AdminClient configurations.
d47c219 is described below

commit d47c219f94f478b4b90bf6f74f78762ea301ebf9
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Thu Jul 11 09:36:24 2019 -0700

    [SPARK-28055][SS][DSTREAMS] Add delegation token custom AdminClient configurations.
    
    ## What changes were proposed in this pull request?
    
    At the moment Kafka delegation tokens are fetched through `AdminClient` but there is no possibility to add custom configuration parameters. In [options](https://spark.apache.org/docs/2.4.3/structured-streaming-kafka-integration.html#kafka-specific-configurations) there is already a possibility to add custom configurations.
    
    In this PR I've added similar this possibility to `AdminClient`.
    
    ## How was this patch tested?
    
    Existing + added unit tests.
    
    ```
    cd docs/
    SKIP_API=1 jekyll build
    ```
    Manual webpage check.
    
    Closes #24875 from gaborgsomogyi/SPARK-28055.
    
    Authored-by: Gabor Somogyi <ga...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 docs/structured-streaming-kafka-integration.md         |  5 +++++
 .../apache/spark/kafka010/KafkaTokenSparkConf.scala    | 18 ++++++++++++------
 .../org/apache/spark/kafka010/KafkaTokenUtil.scala     | 10 ++++++++++
 .../spark/kafka010/KafkaDelegationTokenTest.scala      |  6 ++++--
 .../spark/kafka010/KafkaTokenSparkConfSuite.scala      | 10 ++++++++++
 .../apache/spark/kafka010/KafkaTokenUtilSuite.scala    |  9 +++++++++
 6 files changed, 50 insertions(+), 8 deletions(-)

diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index d5224da..fe3c600 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -818,6 +818,11 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
   </tr>
 </table>
 
+#### Kafka Specific Configurations
+
+Kafka's own configurations can be set with `kafka.` prefix, e.g, `--conf spark.kafka.clusters.${cluster}.kafka.retries=1`.
+For possible Kafka parameters, see [Kafka adminclient config docs](http://kafka.apache.org/documentation.html#adminclientconfigs).
+
 #### Caveats
 
 - Obtaining delegation token for proxy user is not yet supported ([KAFKA-6945](https://issues.apache.org/jira/browse/KAFKA-6945)).
diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala
index 84d58d8..e1f3c80 100644
--- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala
+++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala
@@ -23,6 +23,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT
 
 private[spark] case class KafkaTokenClusterConf(
     identifier: String,
@@ -35,7 +36,8 @@ private[spark] case class KafkaTokenClusterConf(
     keyStoreLocation: Option[String],
     keyStorePassword: Option[String],
     keyPassword: Option[String],
-    tokenMechanism: String) {
+    tokenMechanism: String,
+    specifiedKafkaParams: Map[String, String]) {
   override def toString: String = s"KafkaTokenClusterConf{" +
     s"identifier=$identifier, " +
     s"authBootstrapServers=$authBootstrapServers, " +
@@ -43,11 +45,12 @@ private[spark] case class KafkaTokenClusterConf(
     s"securityProtocol=$securityProtocol, " +
     s"kerberosServiceName=$kerberosServiceName, " +
     s"trustStoreLocation=$trustStoreLocation, " +
-    s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
+    s"trustStorePassword=${trustStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
     s"keyStoreLocation=$keyStoreLocation, " +
-    s"keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
-    s"keyPassword=${keyPassword.map(_ => "xxx")}, " +
-    s"tokenMechanism=$tokenMechanism}"
+    s"keyStorePassword=${keyStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
+    s"keyPassword=${keyPassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
+    s"tokenMechanism=$tokenMechanism, " +
+    s"specifiedKafkaParams=${KafkaRedactionUtil.redactParams(specifiedKafkaParams.toSeq)}}"
 }
 
 private [kafka010] object KafkaTokenSparkConf extends Logging {
@@ -59,6 +62,8 @@ private [kafka010] object KafkaTokenSparkConf extends Logging {
   def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf = {
     val configPrefix = s"$CLUSTERS_CONFIG_PREFIX$identifier."
     val sparkClusterConf = sparkConf.getAllWithPrefix(configPrefix).toMap
+    val configKafkaPrefix = s"${configPrefix}kafka."
+    val sparkClusterKafkaConf = sparkConf.getAllWithPrefix(configKafkaPrefix).toMap
     val result = KafkaTokenClusterConf(
       identifier,
       sparkClusterConf
@@ -76,7 +81,8 @@ private [kafka010] object KafkaTokenSparkConf extends Logging {
       sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
       sparkClusterConf.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG),
       sparkClusterConf.getOrElse("sasl.token.mechanism",
-        KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
+        KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM),
+      sparkClusterKafkaConf
     )
     logDebug(s"getClusterConfig($identifier): $result")
     result
diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
index da21d2e..950df86 100644
--- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
+++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
@@ -134,6 +134,16 @@ private[spark] object KafkaTokenUtil extends Logging {
       }
     }
 
+    logDebug("AdminClient params before specified params: " +
+      s"${KafkaRedactionUtil.redactParams(adminClientProperties.asScala.toSeq)}")
+
+    clusterConf.specifiedKafkaParams.foreach { param =>
+      adminClientProperties.setProperty(param._1, param._2)
+    }
+
+    logDebug("AdminClient params after specified params: " +
+      s"${KafkaRedactionUtil.redactParams(adminClientProperties.asScala.toSeq)}")
+
     adminClientProperties
   }
 
diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala
index 74f1cdc..eebbf96 100644
--- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala
+++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala
@@ -107,7 +107,8 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
 
   protected def createClusterConf(
       identifier: String,
-      securityProtocol: String): KafkaTokenClusterConf = {
+      securityProtocol: String,
+      specifiedKafkaParams: Map[String, String] = Map.empty): KafkaTokenClusterConf = {
     KafkaTokenClusterConf(
       identifier,
       bootStrapServers,
@@ -119,6 +120,7 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
       Some(keyStoreLocation),
       Some(keyStorePassword),
       Some(keyPassword),
-      KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
+      KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM,
+      specifiedKafkaParams)
   }
 }
diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala
index 60bb8a2..61184a6 100644
--- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala
+++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala
@@ -96,6 +96,16 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach {
     assert(clusterConfig.tokenMechanism === tokenMechanism)
   }
 
+  test("getClusterConfig should return specified kafka params") {
+    sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", authBootStrapServers)
+    sparkConf.set(s"spark.kafka.clusters.$identifier1.kafka.customKey", "customValue")
+
+    val clusterConfig = KafkaTokenSparkConf.getClusterConfig(sparkConf, identifier1)
+    assert(clusterConfig.identifier === identifier1)
+    assert(clusterConfig.authBootstrapServers === authBootStrapServers)
+    assert(clusterConfig.specifiedKafkaParams === Map("customKey" -> "customValue"))
+  }
+
   test("getAllClusterConfigs should return empty list when nothing configured") {
     assert(KafkaTokenSparkConf.getAllClusterConfigs(sparkConf).isEmpty)
   }
diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
index bcca920..5496195 100644
--- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
+++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
@@ -155,6 +155,15 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
     assert(saslJaasConfig.contains("useTicketCache=true"))
   }
 
+  test("createAdminClientProperties with specified params should include it") {
+    val clusterConf = createClusterConf(identifier1, SASL_SSL.name,
+      Map("customKey" -> "customValue"))
+
+    val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf)
+
+    assert(adminClientProperties.get("customKey") === "customValue")
+  }
+
   test("isGlobalJaasConfigurationProvided without global config should return false") {
     assert(!KafkaTokenUtil.isGlobalJaasConfigurationProvided)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org