You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/07/15 23:31:08 UTC

spark git commit: [SPARK-8974] Catch exceptions in allocation schedule task.

Repository: spark
Updated Branches:
  refs/heads/master b9a922e26 -> 674eb2a4c


[SPARK-8974] Catch exceptions in allocation schedule task.

I meet a problem. When I submit some tasks, the thread spark-dynamic-executor-allocation should seed the message about "requestTotalExecutors", and the new executor should start. But I meet a problem about this thread, like:

2015-07-14 19:02:17,461 | WARN  | [spark-dynamic-executor-allocation] | Error sending message [message = RequestExecutors(1)] in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
        at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.doRequestTotalExecutors(YarnSchedulerBackend.scala:57)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:351)
        at org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1382)
        at org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:343)
        at org.apache.spark.ExecutorAllocationManager.updateAndSyncNumExecutorsTarget(ExecutorAllocationManager.scala:295)
        at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:248)

when after some minutes, I find a new ApplicationMaster start,  and tasks submitted start to run. The tasks Completed. And after long time (eg, ten minutes), the number of executor  does not reduce to zero.  I use the default value of "spark.dynamicAllocation.minExecutors".

Author: KaiXinXiaoLei <hu...@huawei.com>

Closes #7352 from KaiXinXiaoLei/dym and squashes the following commits:

3603631 [KaiXinXiaoLei] change logError to logWarning
efc4f24 [KaiXinXiaoLei] change file


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/674eb2a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/674eb2a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/674eb2a4

Branch: refs/heads/master
Commit: 674eb2a4c3ff595760f990daf369ba75d2547593
Parents: b9a922e
Author: KaiXinXiaoLei <hu...@huawei.com>
Authored: Wed Jul 15 22:31:10 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Jul 15 22:31:10 2015 +0100

----------------------------------------------------------------------
 .../org/apache/spark/ExecutorAllocationManager.scala    | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/674eb2a4/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 0c50b40..648bcfe 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark
 import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable
+import scala.util.control.ControlThrowable
 
 import com.codahale.metrics.{Gauge, MetricRegistry}
 
@@ -211,7 +212,16 @@ private[spark] class ExecutorAllocationManager(
     listenerBus.addListener(listener)
 
     val scheduleTask = new Runnable() {
-      override def run(): Unit = Utils.logUncaughtExceptions(schedule())
+      override def run(): Unit = {
+        try {
+          schedule()
+        } catch {
+          case ct: ControlThrowable =>
+            throw ct
+          case t: Throwable =>
+            logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
+        }
+      }
     }
     executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org