You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/14 02:00:44 UTC
[spark] branch branch-3.0 updated: [SPARK-33557][CORE][MESOS][3.0]
Ensure the relationship between STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT and
NETWORK_TIMEOUT
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new fcd10a6 [SPARK-33557][CORE][MESOS][3.0] Ensure the relationship between STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT and NETWORK_TIMEOUT
fcd10a6 is described below
commit fcd10a66dc0a4e1f5ef1da7b0da7fcd5711674e6
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Thu Jan 14 11:00:02 2021 +0900
[SPARK-33557][CORE][MESOS][3.0] Ensure the relationship between STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT and NETWORK_TIMEOUT
### What changes were proposed in this pull request?
As described in SPARK-33557, `HeartbeatReceiver` and `MesosCoarseGrainedSchedulerBackend` will always use `Network.NETWORK_TIMEOUT.defaultValueString` as value of `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` when we configure `NETWORK_TIMEOUT` without configure `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT`, this is different from the relationship described in `configuration.md`.
To fix this problem,the main change of this pr as follow:
- Remove the explicitly default value of `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT`
- Use actual value of `NETWORK_TIMEOUT` as `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` when `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` not configured in `HeartbeatReceiver` and `MesosCoarseGrainedSchedulerBackend`
### Why are the changes needed?
To ensure the relationship between `NETWORK_TIMEOUT` and `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` as we described in `configuration.md`
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass the Jenkins or GitHub Action
- Manual test configure `NETWORK_TIMEOUT` and `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` locally
Closes #31175 from dongjoon-hyun/SPARK-33557.
Authored-by: yangjie01 <ya...@baidu.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala | 4 +++-
core/src/main/scala/org/apache/spark/internal/config/package.scala | 2 +-
.../test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala | 1 +
.../scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 5 ++++-
4 files changed, 9 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 2ac72e6..be63072 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -80,7 +80,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new HashMap[String, Long]
- private val executorTimeoutMs = sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT)
+ private val executorTimeoutMs = sc.conf.get(
+ config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT
+ ).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index f3474ef..0afbb52 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -434,7 +434,7 @@ package object config {
ConfigBuilder("spark.storage.blockManagerSlaveTimeoutMs")
.version("0.7.0")
.timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString(Network.NETWORK_TIMEOUT.defaultValueString)
+ .createOptional
private[spark] val STORAGE_CLEANUP_FILES_AFTER_EXECUTOR_EXIT =
ConfigBuilder("spark.storage.cleanupFilesAfterExecutorExit")
diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index 5428fa4..818b990 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -233,6 +233,7 @@ class ExecutorClassLoaderSuite
.setMaster("local")
.setAppName("executor-class-loader-test")
.set("spark.network.timeout", "11s")
+ .set("spark.network.timeoutInterval", "11s")
.set("spark.repl.class.outputDir", tempDir1.getAbsolutePath)
val sc = new SparkContext(conf)
try {
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index e916125..dff0b06 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -34,6 +34,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkExceptio
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Network
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
@@ -643,7 +644,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.registerDriverWithShuffleService(
slave.hostname,
externalShufflePort,
- sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT),
+ sc.conf.get(
+ config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT
+ ).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")),
sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL))
slave.shuffleRegistered = true
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org