You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/01 10:56:57 UTC

git commit: [SPARK-2185] Emit warning when task size exceeds a threshold.

Repository: spark
Updated Branches:
  refs/heads/master 3319a3e3c -> 05c3d90e3


[SPARK-2185] Emit warning when task size exceeds a threshold.

This functionality was added in an earlier commit but shortly
after was removed due to a bad git merge (totally my fault).

Author: Kay Ousterhout <ka...@gmail.com>

Closes #1149 from kayousterhout/warning_bug and squashes the following commits:

3f1bb00 [Kay Ousterhout] Fixed Json tests
462a664 [Kay Ousterhout] Removed task set name from warning message
e89b2f6 [Kay Ousterhout] Fixed Json tests.
7af424c [Kay Ousterhout] Emit warning when task size exceeds a threshold.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05c3d90e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05c3d90e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05c3d90e

Branch: refs/heads/master
Commit: 05c3d90e3527114d3abc08c7cc87f6efef96ebdc
Parents: 3319a3e
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Tue Jul 1 01:56:51 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Jul 1 01:56:51 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 13 -------
 .../org/apache/spark/scheduler/StageInfo.scala  |  2 -
 .../org/apache/spark/scheduler/TaskInfo.scala   |  2 -
 .../apache/spark/scheduler/TaskSetManager.scala | 15 +++++++
 .../org/apache/spark/util/JsonProtocol.scala    | 10 +----
 .../spark/scheduler/TaskSetManagerSuite.scala   | 41 ++++++++++++++++++++
 .../apache/spark/util/JsonProtocolSuite.scala   | 18 ++++-----
 7 files changed, 65 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/05c3d90e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 813a9ab..81c136d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -623,16 +623,6 @@ class DAGScheduler(
   }
 
   private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
-    for (stage <- stageIdToStage.get(task.stageId); stageInfo <- stageToInfos.get(stage)) {
-      if (taskInfo.serializedSize > DAGScheduler.TASK_SIZE_TO_WARN * 1024 &&
-        !stageInfo.emittedTaskSizeWarning) {
-        stageInfo.emittedTaskSizeWarning = true
-        logWarning(("Stage %d (%s) contains a task of very large " +
-          "size (%d KB). The maximum recommended task size is %d KB.").format(
-            task.stageId, stageInfo.name, taskInfo.serializedSize / 1024,
-            DAGScheduler.TASK_SIZE_TO_WARN))
-      }
-    }
     listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
     submitWaitingStages()
   }
@@ -1254,7 +1244,4 @@ private[spark] object DAGScheduler {
   // The time, in millis, to wake up between polls of the completion queue in order to potentially
   // resubmit failed stages
   val POLL_TIMEOUT = 10L
-
-  // Warns the user if a stage contains a task with size greater than this value (in KB)
-  val TASK_SIZE_TO_WARN = 100
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/05c3d90e/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 7644e3f..4808915 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -38,8 +38,6 @@ class StageInfo(
   /** If the stage failed, the reason why. */
   var failureReason: Option[String] = None
 
-  var emittedTaskSizeWarning = false
-
   def stageFailed(reason: String) {
     failureReason = Some(reason)
     completionTime = Some(System.currentTimeMillis)

http://git-wip-us.apache.org/repos/asf/spark/blob/05c3d90e/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 6aecdfe..29de045 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -49,8 +49,6 @@ class TaskInfo(
 
   var failed = false
 
-  var serializedSize: Int = 0
-
   private[spark] def markGettingResult(time: Long = System.currentTimeMillis) {
     gettingResultTime = time
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/05c3d90e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 83ff6b8..059cc90 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -166,6 +166,8 @@ private[spark] class TaskSetManager(
 
   override def schedulingMode = SchedulingMode.NONE
 
+  var emittedTaskSizeWarning = false
+
   /**
    * Add a task to all the pending-task lists that it should be on. If readding is set, we are
    * re-adding the task so only include it in each list if it's not already there.
@@ -418,6 +420,13 @@ private[spark] class TaskSetManager(
           // we assume the task can be serialized without exceptions.
           val serializedTask = Task.serializeWithDependencies(
             task, sched.sc.addedFiles, sched.sc.addedJars, ser)
+          if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
+              !emittedTaskSizeWarning) {
+            emittedTaskSizeWarning = true
+            logWarning(s"Stage ${task.stageId} contains a task of very large size " +
+              s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
+              s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
+          }
           val timeTaken = clock.getTime() - startTime
           addRunningTask(taskId)
           logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
@@ -764,3 +773,9 @@ private[spark] class TaskSetManager(
     localityWaits = myLocalityLevels.map(getLocalityWait)
   }
 }
+
+private[spark] object TaskSetManager {
+  // The user will be warned if any stages contain a task that has a serialized size greater than
+  // this.
+  val TASK_SIZE_TO_WARN_KB = 100
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/05c3d90e/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 26c9c9d..47eb44b 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -190,8 +190,7 @@ private[spark] object JsonProtocol {
     ("Details" -> stageInfo.details) ~
     ("Submission Time" -> submissionTime) ~
     ("Completion Time" -> completionTime) ~
-    ("Failure Reason" -> failureReason) ~
-    ("Emitted Task Size Warning" -> stageInfo.emittedTaskSizeWarning)
+    ("Failure Reason" -> failureReason)
   }
 
   def taskInfoToJson(taskInfo: TaskInfo): JValue = {
@@ -205,8 +204,7 @@ private[spark] object JsonProtocol {
     ("Speculative" -> taskInfo.speculative) ~
     ("Getting Result Time" -> taskInfo.gettingResultTime) ~
     ("Finish Time" -> taskInfo.finishTime) ~
-    ("Failed" -> taskInfo.failed) ~
-    ("Serialized Size" -> taskInfo.serializedSize)
+    ("Failed" -> taskInfo.failed)
   }
 
   def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
@@ -487,13 +485,11 @@ private[spark] object JsonProtocol {
     val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
     val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
     val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
-    val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean]
 
     val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details)
     stageInfo.submissionTime = submissionTime
     stageInfo.completionTime = completionTime
     stageInfo.failureReason = failureReason
-    stageInfo.emittedTaskSizeWarning = emittedTaskSizeWarning
     stageInfo
   }
 
@@ -509,14 +505,12 @@ private[spark] object JsonProtocol {
     val gettingResultTime = (json \ "Getting Result Time").extract[Long]
     val finishTime = (json \ "Finish Time").extract[Long]
     val failed = (json \ "Failed").extract[Boolean]
-    val serializedSize = (json \ "Serialized Size").extract[Int]
 
     val taskInfo =
       new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative)
     taskInfo.gettingResultTime = gettingResultTime
     taskInfo.finishTime = finishTime
     taskInfo.failed = failed
-    taskInfo.serializedSize = serializedSize
     taskInfo
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/05c3d90e/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 59a6189..9ff2a48 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler
 
+import java.util.Random
+
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable
 
@@ -83,6 +85,18 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
   }
 }
 
+/**
+ * A Task implementation that results in a large serialized task.
+ */
+class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0) {
+  val randomBuffer = new Array[Byte](TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024)
+  val random = new Random(0)
+  random.nextBytes(randomBuffer)
+
+  override def runTask(context: TaskContext): Array[Byte] = randomBuffer
+  override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]()
+}
+
 class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
   import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
 
@@ -434,6 +448,33 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
   }
 
+  test("do not emit warning when serialized task is small") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    val taskSet = FakeTask.createTaskSet(1)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
+
+    assert(!manager.emittedTaskSizeWarning)
+
+    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
+
+    assert(!manager.emittedTaskSizeWarning)
+  }
+
+  test("emit warning when serialized task is large") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+
+    val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
+
+    assert(!manager.emittedTaskSizeWarning)
+
+    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
+
+    assert(manager.emittedTaskSizeWarning)
+  }
+
   def createTaskResult(id: Int): DirectTaskResult[Int] = {
     val valueSer = SparkEnv.get.serializer.newInstance()
     new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)

http://git-wip-us.apache.org/repos/asf/spark/blob/05c3d90e/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 316e141..058d314 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -257,7 +257,6 @@ class JsonProtocolSuite extends FunSuite {
     assert(info1.numTasks === info2.numTasks)
     assert(info1.submissionTime === info2.submissionTime)
     assert(info1.completionTime === info2.completionTime)
-    assert(info1.emittedTaskSizeWarning === info2.emittedTaskSizeWarning)
     assert(info1.rddInfos.size === info2.rddInfos.size)
     (0 until info1.rddInfos.size).foreach { i =>
       assertEquals(info1.rddInfos(i), info2.rddInfos(i))
@@ -294,7 +293,6 @@ class JsonProtocolSuite extends FunSuite {
     assert(info1.gettingResultTime === info2.gettingResultTime)
     assert(info1.finishTime === info2.finishTime)
     assert(info1.failed === info2.failed)
-    assert(info1.serializedSize === info2.serializedSize)
   }
 
   private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
@@ -540,9 +538,8 @@ class JsonProtocolSuite extends FunSuite {
   private val stageSubmittedJsonString =
     """
       {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name":
-      "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details",
-      "Emitted Task Size Warning":false},"Properties":{"France":"Paris","Germany":"Berlin",
-      "Russia":"Moscow","Ukraine":"Kiev"}}
+      "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details"},"Properties":
+      {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
     """
 
   private val stageCompletedJsonString =
@@ -551,8 +548,7 @@ class JsonProtocolSuite extends FunSuite {
       "greetings","Number of Tasks":201,"RDD Info":[{"RDD ID":101,"Name":"mayor","Storage
       Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true,
       "Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301,
-      "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details",
-      "Emitted Task Size Warning":false}}
+      "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details"}}
     """
 
   private val taskStartJsonString =
@@ -560,7 +556,7 @@ class JsonProtocolSuite extends FunSuite {
       |{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
       |"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
       |"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,
-      |"Failed":false,"Serialized Size":0}}
+      |"Failed":false}}
     """.stripMargin
 
   private val taskGettingResultJsonString =
@@ -568,7 +564,7 @@ class JsonProtocolSuite extends FunSuite {
       |{"Event":"SparkListenerTaskGettingResult","Task Info":
       |  {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor",
       |   "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
-      |   "Finish Time":0,"Failed":false,"Serialized Size":0
+      |   "Finish Time":0,"Failed":false
       |  }
       |}
     """.stripMargin
@@ -580,7 +576,7 @@ class JsonProtocolSuite extends FunSuite {
       |"Task Info":{
       |  "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
       |  "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
-      |  "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
+      |  "Getting Result Time":0,"Finish Time":0,"Failed":false
       |},
       |"Task Metrics":{
       |  "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
@@ -620,7 +616,7 @@ class JsonProtocolSuite extends FunSuite {
       |"Task Info":{
       |  "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
       |  "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
-      |  "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
+      |  "Getting Result Time":0,"Finish Time":0,"Failed":false
       |},
       |"Task Metrics":{
       |  "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,