You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/05/29 00:57:10 UTC

git commit: [SPARK-1712]: TaskDescription instance is too big causes Spark to hang

Repository: spark
Updated Branches:
  refs/heads/master 4312cf0ba -> 4dbb27b0c


[SPARK-1712]: TaskDescription instance is too big causes Spark to hang

Author: witgo <wi...@qq.com>

Closes #694 from witgo/SPARK-1712_new and squashes the following commits:

0f52483 [witgo] review commit
83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
52e6752 [witgo] reset test SparkContext
63636b6 [witgo] review commit
44a59ee [witgo] review commit
3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
b0930b0 [witgo] review commit
b1174bd [witgo] merge master
f76679b [witgo] merge master
689495d [witgo] fix scala style bug
1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
062c182 [witgo] fix small bug for code style
0a428cf [witgo] add unit tests
158b2dc [witgo] review commit
4afe71d [witgo] review commit
9e4ffa7 [witgo] review commit
1d35c7d [witgo] fix hang
7965580 [witgo] fix Statement order
0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
3ea1ca1 [witgo] remove duplicate serialize
743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4dbb27b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4dbb27b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4dbb27b0

Branch: refs/heads/master
Commit: 4dbb27b0cf4eb67c92aad2c1158616312f5a54e6
Parents: 4312cf0
Author: witgo <wi...@qq.com>
Authored: Wed May 28 15:57:05 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Wed May 28 15:57:05 2014 -0700

----------------------------------------------------------------------
 .../executor/CoarseGrainedExecutorBackend.scala |  9 ++--
 .../cluster/CoarseGrainedClusterMessage.scala   |  2 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala | 27 ++++++++++--
 .../CoarseGrainedSchedulerBackendSuite.scala    | 43 ++++++++++++++++++++
 4 files changed, 73 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4dbb27b0/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 84aec65..2279d77 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -22,11 +22,12 @@ import java.nio.ByteBuffer
 import akka.actor._
 import akka.remote._
 
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.scheduler.TaskDescription
 import org.apache.spark.util.{AkkaUtils, Utils}
 
 private[spark] class CoarseGrainedExecutorBackend(
@@ -61,12 +62,14 @@ private[spark] class CoarseGrainedExecutorBackend(
       logError("Slave registration failed: " + message)
       System.exit(1)
 
-    case LaunchTask(taskDesc) =>
-      logInfo("Got assigned task " + taskDesc.taskId)
+    case LaunchTask(data) =>
       if (executor == null) {
         logError("Received LaunchTask command but executor was null")
         System.exit(1)
       } else {
+        val ser = SparkEnv.get.closureSerializer.newInstance()
+        val taskDesc = ser.deserialize[TaskDescription](data.value)
+        logInfo("Got assigned task " + taskDesc.taskId)
         executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4dbb27b0/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index ddbc74e..ca74069 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
 private[spark] object CoarseGrainedClusterMessages {
 
   // Driver to executors
-  case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
+  case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
 
   case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
     extends CoarseGrainedClusterMessage

http://git-wip-us.apache.org/repos/asf/spark/blob/4dbb27b0/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index a6d6b3d..e47a060 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,10 +27,10 @@ import akka.actor._
 import akka.pattern.ask
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 
-import org.apache.spark.{Logging, SparkException, TaskState}
+import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
 import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
 
 /**
  * A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -48,6 +48,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
   var totalCoreCount = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
+  private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
 
   class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
     private val executorActor = new HashMap[String, ActorRef]
@@ -140,8 +141,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     // Launch tasks returned by a set of resource offers
     def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
       for (task <- tasks.flatten) {
-        freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
-        executorActor(task.executorId) ! LaunchTask(task)
+        val ser = SparkEnv.get.closureSerializer.newInstance()
+        val serializedTask = ser.serialize(task)
+        if (serializedTask.limit >= akkaFrameSize - 1024) {
+          val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
+          scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
+            try {
+              var msg = "Serialized task %s:%d was %d bytes which " +
+                "exceeds spark.akka.frameSize (%d bytes). " +
+                "Consider using broadcast variables for large values."
+              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
+              taskSet.abort(msg)
+            } catch {
+              case e: Exception => logError("Exception in error callback", e)
+            }
+          }
+        }
+        else {
+          freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
+          executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4dbb27b0/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
new file mode 100644
index 0000000..efef9d2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkContext}
+import org.apache.spark.util.{SerializableBuffer, AkkaUtils}
+
+import org.scalatest.FunSuite
+
+class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext {
+
+  test("serialized task larger than akka frame size") {
+    val conf = new SparkConf
+    conf.set("spark.akka.frameSize","1")
+    conf.set("spark.default.parallelism","1")
+    sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf)
+    val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
+    val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
+    val larger = sc.parallelize(Seq(buffer))
+    val thrown = intercept[SparkException] {
+      larger.collect()
+    }
+    assert(thrown.getMessage.contains("Consider using broadcast variables for large values"))
+    val smaller = sc.parallelize(1 to 4).collect()
+    assert(smaller.size === 4)
+  }
+
+}