You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/11/03 21:43:29 UTC
spark git commit: [SPARK-18212][SS][KAFKA] increase executor poll
timeout
Repository: spark
Updated Branches:
refs/heads/master 098e4ca9c -> 67659c9af
[SPARK-18212][SS][KAFKA] increase executor poll timeout
## What changes were proposed in this pull request?
Increase poll timeout to try and address flaky test
## How was this patch tested?
Ran existing unit tests
Author: cody koeninger <co...@koeninger.org>
Closes #15737 from koeninger/SPARK-18212.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67659c9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67659c9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67659c9a
Branch: refs/heads/master
Commit: 67659c9afaeb2289e56fd87fafee953e8f050383
Parents: 098e4ca
Author: cody koeninger <co...@koeninger.org>
Authored: Thu Nov 3 14:43:25 2016 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Nov 3 14:43:25 2016 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 5 ++++-
.../scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala | 3 ++-
2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/67659c9a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 61cba73..b21508c 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -88,7 +88,10 @@ private[kafka010] case class KafkaSource(
private val sc = sqlContext.sparkContext
- private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
+ private val pollTimeoutMs = sourceOptions.getOrElse(
+ "kafkaConsumer.pollTimeoutMs",
+ sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+ ).toLong
private val maxOffsetFetchAttempts =
sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
http://git-wip-us.apache.org/repos/asf/spark/blob/67659c9a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index 5b5a9ac..9839425 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -66,7 +66,8 @@ private[spark] class KafkaRDD[K, V](
" must be set to false for executor kafka params, else offsets may commit before processing")
// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
- private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
+ private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms",
+ conf.getTimeAsMs("spark.network.timeout", "120s"))
private val cacheInitialCapacity =
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
private val cacheMaxCapacity =
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org