You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/07/17 06:30:58 UTC
git commit: [SPARK-1112,
2156] (0.9 edition) Use correct akka frame size and overhead amounts.
Repository: spark
Updated Branches:
refs/heads/branch-0.9 0116dee7e -> 7edee34f3
[SPARK-1112, 2156] (0.9 edition) Use correct akka frame size and overhead amounts.
backport #1172 to branch-0.9.
Author: Patrick Wendell <pw...@gmail.com>
Closes #1455 from mengxr/akka-fix-0.9 and squashes the following commits:
a99f201 [Patrick Wendell] backport PR #1172 to branch-0.9
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7edee34f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7edee34f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7edee34f
Branch: refs/heads/branch-0.9
Commit: 7edee34f33e374cec05a83be66ad50e5672ebf65
Parents: 0116dee
Author: Patrick Wendell <pw...@gmail.com>
Authored: Wed Jul 16 21:30:50 2014 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Wed Jul 16 21:30:50 2014 -0700
----------------------------------------------------------------------
.../executor/CoarseGrainedExecutorBackend.scala | 9 +++++++--
.../org/apache/spark/executor/Executor.scala | 8 +++-----
.../apache/spark/executor/ExecutorBackend.scala | 3 +++
.../cluster/CoarseGrainedSchedulerBackend.scala | 2 +-
.../spark/scheduler/local/LocalBackend.scala | 8 ++++++--
.../scala/org/apache/spark/util/AkkaUtils.scala | 3 +++
.../apache/spark/MapOutputTrackerSuite.scala | 21 +++++++++++---------
7 files changed, 35 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7edee34f/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index be46615..920c6ec 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -34,7 +34,8 @@ private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
hostPort: String,
- cores: Int)
+ cores: Int,
+ actorSystem: ActorSystem)
extends Actor
with ExecutorBackend
with Logging {
@@ -93,6 +94,9 @@ private[spark] class CoarseGrainedExecutorBackend(
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver ! StatusUpdate(executorId, taskId, state, data)
}
+
+ override def akkaFrameSize() = actorSystem.settings.config.getBytes(
+ "akka.remote.netty.tcp.maximum-frame-size")
}
private[spark] object CoarseGrainedExecutorBackend {
@@ -110,7 +114,8 @@ private[spark] object CoarseGrainedExecutorBackend {
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
- Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
+ Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores,
+ actorSystem),
name = "Executor")
workerUrl.foreach {
url =>
http://git-wip-us.apache.org/repos/asf/spark/blob/7edee34f/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 89907e5..989806c 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -117,10 +117,6 @@ private[spark] class Executor(
}
}
- // Akka's message frame size. If task result is bigger than this, we use the block manager
- // to send the result back.
- private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-
// Start worker thread pool
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
@@ -232,8 +228,10 @@ private[spark] class Executor(
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
+
val serializedResult = {
- if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
+ if (serializedDirectResult.limit >= execBackend.akkaFrameSize() -
+ AkkaUtils.reservedSizeBytes) {
logInfo("Storing result for " + taskId + " in local BlockManager")
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
http://git-wip-us.apache.org/repos/asf/spark/blob/7edee34f/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
index ad7dd34..9947403 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
@@ -25,4 +25,7 @@ import org.apache.spark.TaskState.TaskState
*/
private[spark] trait ExecutorBackend {
def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
+
+ // Exists as a work around for SPARK-1112. This only exists in branch-1.x of Spark.
+ def akkaFrameSize(): Long = Long.MaxValue
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7edee34f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 85644c5..9735c42 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -142,7 +142,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
for (task <- tasks.flatten) {
val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
- if (serializedTask.limit >= akkaFrameSize - 1024) {
+ if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
try {
http://git-wip-us.apache.org/repos/asf/spark/blob/7edee34f/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 897d47a..21ef896 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -20,11 +20,11 @@ package org.apache.spark.scheduler.local
import java.nio.ByteBuffer
import akka.actor.{Actor, ActorRef, Props}
-
-import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
+import org.apache.spark.{Logging, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.util.AkkaUtils
private case class ReviveOffers()
@@ -106,4 +106,8 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
localActor ! StatusUpdate(taskId, state, serializedData)
}
+
+ // This limit is calculated only to preserve expected behavior in tests. In reality, since this
+ // backend sends messages over the existing actor system, there is no need to enforce a limit.
+ override def akkaFrameSize() = AkkaUtils.maxFrameSizeBytes(scheduler.sc.getConf)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7edee34f/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 4dcf39b..7301e54 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -115,4 +115,7 @@ private[spark] object AkkaUtils {
def maxFrameSizeBytes(conf: SparkConf): Int = {
conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
}
+
+ /** Space reserved for extra data in an Akka message besides serialized task or task result. */
+ val reservedSizeBytes = 200 * 1024
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7edee34f/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index b6e064c..ac341b3 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -161,7 +161,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("remote fetch exceeds akka frame size") {
val newConf = new SparkConf
- newConf.set("spark.akka.frameSize", "1")
newConf.set("spark.akka.askTimeout", "1") // Fail fast
val masterTracker = new MapOutputTrackerMaster(conf)
@@ -170,14 +169,18 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
val masterActor = actorRef.underlyingActor
- // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
- // Note that the size is hand-selected here because map output statuses are compressed before
- // being sent.
- masterTracker.registerShuffle(20, 100)
- (0 until 100).foreach { i =>
- masterTracker.registerMapOutput(20, i, new MapStatus(
- BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
+ // Frame size should be 2 * the configured frame size, and MapOutputTrackerMasterActor should
+ // throw exception.
+ val shuffleId = 20
+ val numMaps = 2
+ val data = new Array[Byte](AkkaUtils.maxFrameSizeBytes(conf))
+ val random = new java.util.Random(0)
+ random.nextBytes(data) // Make it hard to compress.
+ masterTracker.registerShuffle(shuffleId, numMaps)
+ (0 until numMaps).foreach { i =>
+ masterTracker.registerMapOutput(shuffleId, i, new MapStatus(
+ BlockManagerId("999", "mps", 1000, 0), data))
}
- intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
+ intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(shuffleId)) }
}
}