You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/11/27 04:08:40 UTC

[1/2] git commit: Emit warning when task size > 100KB

Updated Branches:
  refs/heads/master 615213fb8 -> 330ada176


Emit warning when task size > 100KB


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/57579934
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/57579934
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/57579934

Branch: refs/heads/master
Commit: 57579934f0454f258615c10e69ac2adafc5b9835
Parents: 0e2109d
Author: hhd <he...@gmail.com>
Authored: Mon Nov 25 17:17:17 2013 -0500
Committer: hhd <he...@gmail.com>
Committed: Tue Nov 26 16:58:39 2013 -0500

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/DAGScheduler.scala    | 15 +++++++++++++++
 .../scala/org/apache/spark/scheduler/StageInfo.scala |  1 +
 .../scala/org/apache/spark/scheduler/TaskInfo.scala  |  2 ++
 .../scheduler/cluster/ClusterTaskSetManager.scala    |  1 +
 4 files changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/57579934/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 42bb388..4457525 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -110,6 +110,9 @@ class DAGScheduler(
   // resubmit failed stages
   val POLL_TIMEOUT = 10L
 
+  // Warns the user if a stage contains a task with size greater than this value (in KB)
+  val TASK_SIZE_TO_WARN = 100
+
   private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
     override def preStart() {
       context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
@@ -430,6 +433,18 @@ class DAGScheduler(
         handleExecutorLost(execId)
 
       case BeginEvent(task, taskInfo) =>
+        for (
+          job <- idToActiveJob.get(task.stageId);
+          stage <- stageIdToStage.get(task.stageId);
+          stageInfo <- stageToInfos.get(stage)
+        ) {
+          if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 && !stageInfo.emittedTaskSizeWarning) {
+            stageInfo.emittedTaskSizeWarning = true
+            logWarning(("Stage %d (%s) contains a task of very large " +
+              "size (%d KB). The maximum recommended task size is %d KB.").format(
+              task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, TASK_SIZE_TO_WARN))
+          }
+        }
         listenerBus.post(SparkListenerTaskStart(task, taskInfo))
 
       case GettingResultEvent(task, taskInfo) =>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/57579934/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 93599df..e9f2198 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -33,4 +33,5 @@ class StageInfo(
   val name = stage.name
   val numPartitions = stage.numPartitions
   val numTasks = stage.numTasks
+  var emittedTaskSizeWarning = false
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/57579934/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 4bae26f..3c22edd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -46,6 +46,8 @@ class TaskInfo(
 
   var failed = false
 
+  var serializedSize: Int = 0
+
   def markGettingResult(time: Long = System.currentTimeMillis) {
     gettingResultTime = time
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/57579934/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 4c5eca8..8884ea8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -377,6 +377,7 @@ private[spark] class ClusterTaskSetManager(
           logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
             taskSet.id, index, serializedTask.limit, timeTaken))
           val taskName = "task %s:%d".format(taskSet.id, index)
+          info.serializedSize = serializedTask.limit
           if (taskAttempts(index).size == 1)
             taskStarted(task,info)
           return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))


[2/2] git commit: Merge pull request #207 from henrydavidge/master

Posted by ma...@apache.org.
Merge pull request #207 from henrydavidge/master

Log a warning if a task's serialized size is very big

As per Reynold's instructions, we now create a warning level log entry if a task's serialized size is too big. "Too big" is currently defined as 100kb. This warning message is generated at most once for each stage.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/330ada17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/330ada17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/330ada17

Branch: refs/heads/master
Commit: 330ada1766c1f8a7274b5566fa66b796329d7054
Parents: 615213f 5757993
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Tue Nov 26 19:08:33 2013 -0800
Committer: Matei Zaharia <ma...@eecs.berkeley.edu>
Committed: Tue Nov 26 19:08:33 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/DAGScheduler.scala    | 15 +++++++++++++++
 .../scala/org/apache/spark/scheduler/StageInfo.scala |  1 +
 .../scala/org/apache/spark/scheduler/TaskInfo.scala  |  2 ++
 .../scheduler/cluster/ClusterTaskSetManager.scala    |  1 +
 4 files changed, 19 insertions(+)
----------------------------------------------------------------------