You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/14 02:00:44 UTC

[spark] branch branch-3.0 updated: [SPARK-33557][CORE][MESOS][3.0] Ensure the relationship between STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT and NETWORK_TIMEOUT

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fcd10a6  [SPARK-33557][CORE][MESOS][3.0] Ensure the relationship between STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT and NETWORK_TIMEOUT
fcd10a6 is described below

commit fcd10a66dc0a4e1f5ef1da7b0da7fcd5711674e6
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Thu Jan 14 11:00:02 2021 +0900

    [SPARK-33557][CORE][MESOS][3.0] Ensure the relationship between STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT and NETWORK_TIMEOUT
    
    ### What changes were proposed in this pull request?
    As described in SPARK-33557, `HeartbeatReceiver` and `MesosCoarseGrainedSchedulerBackend` will always use `Network.NETWORK_TIMEOUT.defaultValueString` as value of `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` when we configure `NETWORK_TIMEOUT` without configure `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT`, this is different from the relationship described in `configuration.md`.
    
    To fix this problem,the main change of this pr as follow:
    
    - Remove the explicitly default value of `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT`
    
    - Use actual value of `NETWORK_TIMEOUT` as `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` when `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` not configured in `HeartbeatReceiver` and `MesosCoarseGrainedSchedulerBackend`
    
    ### Why are the changes needed?
    To ensure the relationship between `NETWORK_TIMEOUT` and  `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` as we described in `configuration.md`
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    - Pass the Jenkins or GitHub Action
    
    - Manual test configure `NETWORK_TIMEOUT` and `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` locally
    
    Closes #31175 from dongjoon-hyun/SPARK-33557.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala         | 4 +++-
 core/src/main/scala/org/apache/spark/internal/config/package.scala   | 2 +-
 .../test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala  | 1 +
 .../scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 5 ++++-
 4 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 2ac72e6..be63072 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -80,7 +80,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
   // executor ID -> timestamp of when the last heartbeat from this executor was received
   private val executorLastSeen = new HashMap[String, Long]
 
-  private val executorTimeoutMs = sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT)
+  private val executorTimeoutMs = sc.conf.get(
+    config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT
+  ).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
 
   private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
 
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index f3474ef..0afbb52 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -434,7 +434,7 @@ package object config {
     ConfigBuilder("spark.storage.blockManagerSlaveTimeoutMs")
       .version("0.7.0")
       .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString(Network.NETWORK_TIMEOUT.defaultValueString)
+      .createOptional
 
   private[spark] val STORAGE_CLEANUP_FILES_AFTER_EXECUTOR_EXIT =
     ConfigBuilder("spark.storage.cleanupFilesAfterExecutorExit")
diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index 5428fa4..818b990 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -233,6 +233,7 @@ class ExecutorClassLoaderSuite
       .setMaster("local")
       .setAppName("executor-class-loader-test")
       .set("spark.network.timeout", "11s")
+      .set("spark.network.timeoutInterval", "11s")
       .set("spark.repl.class.outputDir", tempDir1.getAbsolutePath)
     val sc = new SparkContext(conf)
     try {
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index e916125..dff0b06 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -34,6 +34,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkExceptio
 import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Network
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
 import org.apache.spark.network.netty.SparkTransportConf
@@ -643,7 +644,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           .registerDriverWithShuffleService(
             slave.hostname,
             externalShufflePort,
-            sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT),
+            sc.conf.get(
+              config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT
+            ).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")),
             sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL))
         slave.shuffleRegistered = true
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org