You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/11/03 06:52:45 UTC

git commit: [SPARK-4109][CORE] Correctly deserialize Task.stageId

Repository: spark
Updated Branches:
  refs/heads/branch-1.0 6d8f1dd15 -> 49224fd0f


[SPARK-4109][CORE] Correctly deserialize Task.stageId

The two subclasses of Task, ShuffleMapTask and ResultTask, do not correctly deserialize stageId. Therefore, the accessing of TaskContext.stageId always returns zero value to the user.

Author: luluorta <lu...@gmail.com>

Closes #2971 from luluorta/fix-task-ser and squashes the following commits:

ff35ee6 [luluorta] correctly deserialize Task.stageId


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49224fd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49224fd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49224fd0

Branch: refs/heads/branch-1.0
Commit: 49224fd0f762374b797d7920f6dc7c88cdb19a74
Parents: 6d8f1dd
Author: luluorta <lu...@gmail.com>
Authored: Sun Nov 2 21:52:39 2014 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Nov 2 21:52:39 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/scheduler/ResultTask.scala   | 8 ++++----
 .../scala/org/apache/spark/scheduler/ShuffleMapTask.scala    | 8 ++++----
 core/src/main/scala/org/apache/spark/scheduler/Task.scala    | 2 +-
 3 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/49224fd0/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 0e8d551..645c674 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -80,7 +80,7 @@ private[spark] object ResultTask {
  *
  * See [[org.apache.spark.scheduler.Task]] for more information.
  *
- * @param stageId id of the stage this task belongs to
+ * @param _stageId id of the stage this task belongs to
  * @param rdd input to func
  * @param func a function to apply on a partition of the RDD
  * @param _partitionId index of the number in the RDD
@@ -89,13 +89,13 @@ private[spark] object ResultTask {
  *                 input RDD's partitions).
  */
 private[spark] class ResultTask[T, U](
-    stageId: Int,
+    _stageId: Int,
     var rdd: RDD[T],
     var func: (TaskContext, Iterator[T]) => U,
     _partitionId: Int,
     @transient locs: Seq[TaskLocation],
     var outputId: Int)
-  extends Task[U](stageId, _partitionId) with Externalizable {
+  extends Task[U](_stageId, _partitionId) with Externalizable {
 
   def this() = this(0, null, null, 0, null, 0)
 
@@ -134,7 +134,7 @@ private[spark] class ResultTask[T, U](
   }
 
   override def readExternal(in: ObjectInput) {
-    val stageId = in.readInt()
+    stageId = in.readInt()
     val numBytes = in.readInt()
     val bytes = new Array[Byte](numBytes)
     in.readFully(bytes)

http://git-wip-us.apache.org/repos/asf/spark/blob/49224fd0/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index ed0f56f..9cc498c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -90,19 +90,19 @@ private[spark] object ShuffleMapTask {
  *
  * See [[org.apache.spark.scheduler.Task]] for more information.
  *
- * @param stageId id of the stage this task belongs to
+ * @param _stageId id of the stage this task belongs to
  * @param rdd the final RDD in this stage
  * @param dep the ShuffleDependency
  * @param _partitionId index of the number in the RDD
  * @param locs preferred task execution locations for locality scheduling
  */
 private[spark] class ShuffleMapTask(
-    stageId: Int,
+    _stageId: Int,
     var rdd: RDD[_],
     var dep: ShuffleDependency[_,_],
     _partitionId: Int,
     @transient private var locs: Seq[TaskLocation])
-  extends Task[MapStatus](stageId, _partitionId)
+  extends Task[MapStatus](_stageId, _partitionId)
   with Externalizable
   with Logging {
 
@@ -128,7 +128,7 @@ private[spark] class ShuffleMapTask(
   }
 
   override def readExternal(in: ObjectInput) {
-    val stageId = in.readInt()
+    stageId = in.readInt()
     val numBytes = in.readInt()
     val bytes = new Array[Byte](numBytes)
     in.readFully(bytes)

http://git-wip-us.apache.org/repos/asf/spark/blob/49224fd0/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 5871ede..a81af57 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -40,7 +40,7 @@ import org.apache.spark.util.ByteBufferInputStream
  * @param stageId id of the stage this task belongs to
  * @param partitionId index of the number in the RDD
  */
-private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {
+private[spark] abstract class Task[T](var stageId: Int, var partitionId: Int) extends Serializable {
 
   final def run(attemptId: Long): T = {
     context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org