You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/11/03 06:52:45 UTC
git commit: [SPARK-4109][CORE] Correctly deserialize Task.stageId
Repository: spark
Updated Branches:
refs/heads/branch-1.0 6d8f1dd15 -> 49224fd0f
[SPARK-4109][CORE] Correctly deserialize Task.stageId
The two subclasses of Task, ShuffleMapTask and ResultTask, do not correctly deserialize stageId. Therefore, the accessing of TaskContext.stageId always returns zero value to the user.
Author: luluorta <lu...@gmail.com>
Closes #2971 from luluorta/fix-task-ser and squashes the following commits:
ff35ee6 [luluorta] correctly deserialize Task.stageId
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49224fd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49224fd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49224fd0
Branch: refs/heads/branch-1.0
Commit: 49224fd0f762374b797d7920f6dc7c88cdb19a74
Parents: 6d8f1dd
Author: luluorta <lu...@gmail.com>
Authored: Sun Nov 2 21:52:39 2014 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Nov 2 21:52:39 2014 -0800
----------------------------------------------------------------------
.../main/scala/org/apache/spark/scheduler/ResultTask.scala | 8 ++++----
.../scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 8 ++++----
core/src/main/scala/org/apache/spark/scheduler/Task.scala | 2 +-
3 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/49224fd0/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 0e8d551..645c674 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -80,7 +80,7 @@ private[spark] object ResultTask {
*
* See [[org.apache.spark.scheduler.Task]] for more information.
*
- * @param stageId id of the stage this task belongs to
+ * @param _stageId id of the stage this task belongs to
* @param rdd input to func
* @param func a function to apply on a partition of the RDD
* @param _partitionId index of the number in the RDD
@@ -89,13 +89,13 @@ private[spark] object ResultTask {
* input RDD's partitions).
*/
private[spark] class ResultTask[T, U](
- stageId: Int,
+ _stageId: Int,
var rdd: RDD[T],
var func: (TaskContext, Iterator[T]) => U,
_partitionId: Int,
@transient locs: Seq[TaskLocation],
var outputId: Int)
- extends Task[U](stageId, _partitionId) with Externalizable {
+ extends Task[U](_stageId, _partitionId) with Externalizable {
def this() = this(0, null, null, 0, null, 0)
@@ -134,7 +134,7 @@ private[spark] class ResultTask[T, U](
}
override def readExternal(in: ObjectInput) {
- val stageId = in.readInt()
+ stageId = in.readInt()
val numBytes = in.readInt()
val bytes = new Array[Byte](numBytes)
in.readFully(bytes)
http://git-wip-us.apache.org/repos/asf/spark/blob/49224fd0/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index ed0f56f..9cc498c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -90,19 +90,19 @@ private[spark] object ShuffleMapTask {
*
* See [[org.apache.spark.scheduler.Task]] for more information.
*
- * @param stageId id of the stage this task belongs to
+ * @param _stageId id of the stage this task belongs to
* @param rdd the final RDD in this stage
* @param dep the ShuffleDependency
* @param _partitionId index of the number in the RDD
* @param locs preferred task execution locations for locality scheduling
*/
private[spark] class ShuffleMapTask(
- stageId: Int,
+ _stageId: Int,
var rdd: RDD[_],
var dep: ShuffleDependency[_,_],
_partitionId: Int,
@transient private var locs: Seq[TaskLocation])
- extends Task[MapStatus](stageId, _partitionId)
+ extends Task[MapStatus](_stageId, _partitionId)
with Externalizable
with Logging {
@@ -128,7 +128,7 @@ private[spark] class ShuffleMapTask(
}
override def readExternal(in: ObjectInput) {
- val stageId = in.readInt()
+ stageId = in.readInt()
val numBytes = in.readInt()
val bytes = new Array[Byte](numBytes)
in.readFully(bytes)
http://git-wip-us.apache.org/repos/asf/spark/blob/49224fd0/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 5871ede..a81af57 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -40,7 +40,7 @@ import org.apache.spark.util.ByteBufferInputStream
* @param stageId id of the stage this task belongs to
* @param partitionId index of the number in the RDD
*/
-private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {
+private[spark] abstract class Task[T](var stageId: Int, var partitionId: Int) extends Serializable {
final def run(attemptId: Long): T = {
context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org