You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/02/27 18:07:24 UTC
[spark] branch master updated: [SPARK-27002][SS] Get kafka
delegation tokens right before consumer/producer created
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 76e0b6b [SPARK-27002][SS] Get kafka delegation tokens right before consumer/producer created
76e0b6b is described below
commit 76e0b6bafb66a5cd02edcf924a90fc389fddea8e
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Wed Feb 27 10:07:02 2019 -0800
[SPARK-27002][SS] Get kafka delegation tokens right before consumer/producer created
## What changes were proposed in this pull request?
Spark not always picking up the latest Kafka delegation tokens even if a new one properly obtained.
In the PR I'm setting delegation tokens right before `KafkaConsumer` and `KafkaProducer` creation to be on the safe side.
## How was this patch tested?
Long running Kafka to Kafka tests on 4 node cluster with randomly thrown artificial exceptions.
Test scenario:
* 4 node cluster
* Yarn
* Kafka broker version 2.1.0
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-512
Kafka broker settings:
* delegation.token.expiry.time.ms=600000 (10 min)
* delegation.token.max.lifetime.ms=1200000 (20 min)
* delegation.token.expiry.check.interval.ms=300000 (5 min)
After each 7.5 minutes new delegation token obtained from Kafka broker (10 min * 0.75).
But when token expired after 10 minutes (Spark obtains new one and doesn't renew the old), the brokers expiring thread comes after each 5 minutes (invalidates expired tokens) and artificial exception has been thrown inside the Spark application (such case Spark closes connection), then the latest delegation token not always picked up.
Closes #23906 from gaborgsomogyi/SPARK-27002.
Authored-by: Gabor Somogyi <ga...@gmail.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
.../spark/sql/kafka010/CachedKafkaProducer.scala | 8 ++++++--
.../apache/spark/sql/kafka010/ConsumerStrategy.scala | 18 +++++++++++++++---
.../apache/spark/sql/kafka010/KafkaConfigUpdater.scala | 2 +-
.../apache/spark/sql/kafka010/KafkaDataConsumer.scala | 5 ++++-
.../spark/sql/kafka010/KafkaSourceProvider.scala | 3 ---
5 files changed, 26 insertions(+), 10 deletions(-)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
index cd680ad..f24001f 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
@@ -64,8 +64,12 @@ private[kafka010] object CachedKafkaProducer extends Logging {
.build[Seq[(String, Object)], Producer](cacheLoader)
private def createKafkaProducer(producerConfiguration: ju.Map[String, Object]): Producer = {
- val kafkaProducer: Producer = new Producer(producerConfiguration)
- logDebug(s"Created a new instance of KafkaProducer for $producerConfiguration.")
+ val updatedKafkaProducerConfiguration =
+ KafkaConfigUpdater("executor", producerConfiguration.asScala.toMap)
+ .setAuthenticationConfigIfNeeded()
+ .build()
+ val kafkaProducer: Producer = new Producer(updatedKafkaProducerConfiguration)
+ logDebug(s"Created a new instance of KafkaProducer for $updatedKafkaProducerConfiguration.")
kafkaProducer
}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
index 66511b3..dfdafce 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
@@ -37,6 +37,15 @@ import org.apache.kafka.common.TopicPartition
sealed trait ConsumerStrategy {
/** Create a [[KafkaConsumer]] and subscribe to topics according to a desired strategy */
def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
+
+ /**
+ * Updates the parameters with security if needed.
+ * Added a function to hide internals and reduce code duplications because all strategy uses it.
+ */
+ protected def setAuthenticationConfigIfNeeded(kafkaParams: ju.Map[String, Object]) =
+ KafkaConfigUpdater("source", kafkaParams.asScala.toMap)
+ .setAuthenticationConfigIfNeeded()
+ .build()
}
/**
@@ -45,7 +54,8 @@ sealed trait ConsumerStrategy {
case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy {
override def createConsumer(
kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = {
- val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
+ val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
+ val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](updatedKafkaParams)
consumer.assign(ju.Arrays.asList(partitions: _*))
consumer
}
@@ -59,7 +69,8 @@ case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStr
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy {
override def createConsumer(
kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = {
- val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
+ val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
+ val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](updatedKafkaParams)
consumer.subscribe(topics.asJava)
consumer
}
@@ -73,7 +84,8 @@ case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy {
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy {
override def createConsumer(
kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = {
- val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
+ val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
+ val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](updatedKafkaParams)
consumer.subscribe(
ju.regex.Pattern.compile(topicPattern),
new NoOpConsumerRebalanceListener())
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
index 38bf5d7..978dfe6 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
@@ -31,7 +31,7 @@ import org.apache.spark.kafka010.KafkaTokenUtil
/**
* Class to conveniently update Kafka config params, while logging the changes
*/
-private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, String])
+private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object])
extends Logging {
private val map = new ju.HashMap[String, Object](kafkaParams.asJava)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
index 7b1314b..a0255a1 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
@@ -197,7 +197,10 @@ private[kafka010] case class InternalKafkaConsumer(
/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
- val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
+ val updatedKafkaParams = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap)
+ .setAuthenticationConfigIfNeeded()
+ .build()
+ val c = new KafkaConsumer[Array[Byte], Array[Byte]](updatedKafkaParams)
val tps = new ju.ArrayList[TopicPartition]()
tps.add(topicPartition)
c.assign(tps)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 6994517..a139573 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -525,7 +525,6 @@ private[kafka010] object KafkaSourceProvider extends Logging {
// If buffer config is not set, set it to reasonable value to work around
// buffer issues (see KAFKA-3135)
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
- .setAuthenticationConfigIfNeeded()
.build()
def kafkaParamsForExecutors(
@@ -547,7 +546,6 @@ private[kafka010] object KafkaSourceProvider extends Logging {
// If buffer config is not set, set it to reasonable value to work around
// buffer issues (see KAFKA-3135)
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
- .setAuthenticationConfigIfNeeded()
.build()
/**
@@ -582,7 +580,6 @@ private[kafka010] object KafkaSourceProvider extends Logging {
KafkaConfigUpdater("executor", specifiedKafkaParams)
.set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
.set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
- .setAuthenticationConfigIfNeeded()
.build()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org