You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/11/03 21:43:29 UTC

spark git commit: [SPARK-18212][SS][KAFKA] increase executor poll timeout

Repository: spark
Updated Branches:
  refs/heads/master 098e4ca9c -> 67659c9af


[SPARK-18212][SS][KAFKA] increase executor poll timeout

## What changes were proposed in this pull request?

Increase poll timeout to try and address flaky test

## How was this patch tested?

Ran existing unit tests

Author: cody koeninger <co...@koeninger.org>

Closes #15737 from koeninger/SPARK-18212.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67659c9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67659c9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67659c9a

Branch: refs/heads/master
Commit: 67659c9afaeb2289e56fd87fafee953e8f050383
Parents: 098e4ca
Author: cody koeninger <co...@koeninger.org>
Authored: Thu Nov 3 14:43:25 2016 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Nov 3 14:43:25 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala  | 5 ++++-
 .../scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala    | 3 ++-
 2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/67659c9a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 61cba73..b21508c 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -88,7 +88,10 @@ private[kafka010] case class KafkaSource(
 
   private val sc = sqlContext.sparkContext
 
-  private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
+  private val pollTimeoutMs = sourceOptions.getOrElse(
+    "kafkaConsumer.pollTimeoutMs",
+    sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
 
   private val maxOffsetFetchAttempts =
     sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt

http://git-wip-us.apache.org/repos/asf/spark/blob/67659c9a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index 5b5a9ac..9839425 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -66,7 +66,8 @@ private[spark] class KafkaRDD[K, V](
       " must be set to false for executor kafka params, else offsets may commit before processing")
 
   // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
-  private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
+  private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms",
+    conf.getTimeAsMs("spark.network.timeout", "120s"))
   private val cacheInitialCapacity =
     conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
   private val cacheMaxCapacity =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org