You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2018/05/24 20:00:28 UTC
spark git commit: [SPARK-24332][SS][MESOS] Fix places reading
'spark.network.timeout' as milliseconds
Repository: spark
Updated Branches:
refs/heads/master 0d8994344 -> 53c06ddab
[SPARK-24332][SS][MESOS] Fix places reading 'spark.network.timeout' as milliseconds
## What changes were proposed in this pull request?
This PR replaces `getTimeAsMs` with `getTimeAsSeconds` to fix the issue that reading "spark.network.timeout" using a wrong time unit when the user doesn't specify a time out.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <zs...@gmail.com>
Closes #21382 from zsxwing/fix-network-timeout-conf.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53c06dda
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53c06dda
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53c06dda
Branch: refs/heads/master
Commit: 53c06ddabbdf689f8823807445849ad63173676f
Parents: 0d89943
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Thu May 24 13:00:24 2018 -0700
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Thu May 24 13:00:24 2018 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala | 2 +-
.../main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala | 4 +++-
.../main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 2 +-
.../scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala | 2 +-
.../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 2 +-
5 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/53c06dda/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
index 64ba987..737da2e 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -68,7 +68,7 @@ private[kafka010] class KafkaMicroBatchReader(
private val pollTimeoutMs = options.getLong(
"kafkaConsumer.pollTimeoutMs",
- SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+ SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
private val maxOffsetsPerTrigger =
Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
http://git-wip-us.apache.org/repos/asf/spark/blob/53c06dda/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
index 7103709..c31e6ed 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
@@ -48,7 +48,9 @@ private[kafka010] class KafkaRelation(
private val pollTimeoutMs = sourceOptions.getOrElse(
"kafkaConsumer.pollTimeoutMs",
- sqlContext.sparkContext.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+ (sqlContext.sparkContext.conf.getTimeAsSeconds(
+ "spark.network.timeout",
+ "120s") * 1000L).toString
).toLong
override def schema: StructType = KafkaOffsetReader.kafkaSchema
http://git-wip-us.apache.org/repos/asf/spark/blob/53c06dda/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 1c7b3a2..101e649 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -84,7 +84,7 @@ private[kafka010] class KafkaSource(
private val pollTimeoutMs = sourceOptions.getOrElse(
"kafkaConsumer.pollTimeoutMs",
- sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+ (sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L).toString
).toLong
private val maxOffsetsPerTrigger =
http://git-wip-us.apache.org/repos/asf/spark/blob/53c06dda/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index 81abc98..3efc90f 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -65,7 +65,7 @@ private[spark] class KafkaRDD[K, V](
// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms",
- conf.getTimeAsMs("spark.network.timeout", "120s"))
+ conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
private val cacheInitialCapacity =
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
private val cacheMaxCapacity =
http://git-wip-us.apache.org/repos/asf/spark/blob/53c06dda/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 9b75e4c..d35bea4 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -634,7 +634,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
slave.hostname,
externalShufflePort,
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
- s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms"),
+ s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L}ms"),
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
slave.shuffleRegistered = true
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org