You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2018/05/24 20:00:28 UTC

spark git commit: [SPARK-24332][SS][MESOS] Fix places reading 'spark.network.timeout' as milliseconds

Repository: spark
Updated Branches:
  refs/heads/master 0d8994344 -> 53c06ddab


[SPARK-24332][SS][MESOS] Fix places reading 'spark.network.timeout' as milliseconds

## What changes were proposed in this pull request?

This PR replaces `getTimeAsMs` with `getTimeAsSeconds` to fix the issue that reading "spark.network.timeout" using a wrong time unit when the user doesn't specify a time out.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zs...@gmail.com>

Closes #21382 from zsxwing/fix-network-timeout-conf.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53c06dda
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53c06dda
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53c06dda

Branch: refs/heads/master
Commit: 53c06ddabbdf689f8823807445849ad63173676f
Parents: 0d89943
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Thu May 24 13:00:24 2018 -0700
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Thu May 24 13:00:24 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala    | 2 +-
 .../main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala | 4 +++-
 .../main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala   | 2 +-
 .../scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala     | 2 +-
 .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala       | 2 +-
 5 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/53c06dda/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
index 64ba987..737da2e 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -68,7 +68,7 @@ private[kafka010] class KafkaMicroBatchReader(
 
   private val pollTimeoutMs = options.getLong(
     "kafkaConsumer.pollTimeoutMs",
-    SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+    SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
 
   private val maxOffsetsPerTrigger =
     Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)

http://git-wip-us.apache.org/repos/asf/spark/blob/53c06dda/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
index 7103709..c31e6ed 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
@@ -48,7 +48,9 @@ private[kafka010] class KafkaRelation(
 
   private val pollTimeoutMs = sourceOptions.getOrElse(
     "kafkaConsumer.pollTimeoutMs",
-    sqlContext.sparkContext.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+    (sqlContext.sparkContext.conf.getTimeAsSeconds(
+      "spark.network.timeout",
+      "120s") * 1000L).toString
   ).toLong
 
   override def schema: StructType = KafkaOffsetReader.kafkaSchema

http://git-wip-us.apache.org/repos/asf/spark/blob/53c06dda/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 1c7b3a2..101e649 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -84,7 +84,7 @@ private[kafka010] class KafkaSource(
 
   private val pollTimeoutMs = sourceOptions.getOrElse(
     "kafkaConsumer.pollTimeoutMs",
-    sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+    (sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L).toString
   ).toLong
 
   private val maxOffsetsPerTrigger =

http://git-wip-us.apache.org/repos/asf/spark/blob/53c06dda/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index 81abc98..3efc90f 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -65,7 +65,7 @@ private[spark] class KafkaRDD[K, V](
 
   // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
   private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms",
-    conf.getTimeAsMs("spark.network.timeout", "120s"))
+    conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
   private val cacheInitialCapacity =
     conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
   private val cacheMaxCapacity =

http://git-wip-us.apache.org/repos/asf/spark/blob/53c06dda/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 9b75e4c..d35bea4 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -634,7 +634,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
             slave.hostname,
             externalShufflePort,
             sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
-              s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms"),
+              s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L}ms"),
             sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
         slave.shuffleRegistered = true
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org