You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/09/27 23:46:06 UTC

git commit: [SPARK-3543] Clean up Java TaskContext implementation.

Repository: spark
Updated Branches:
  refs/heads/master 0d8cdf0ed -> 5b922bb45


[SPARK-3543] Clean up Java TaskContext implementation.

This addresses some minor issues in https://github.com/apache/spark/pull/2425

Author: Reynold Xin <rx...@apache.org>

Closes #2557 from rxin/TaskContext and squashes the following commits:

a51e5f6 [Reynold Xin] [SPARK-3543] Clean up Java TaskContext implementation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b922bb4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b922bb4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b922bb4

Branch: refs/heads/master
Commit: 5b922bb458e863f5be0ae68167de882743f70b86
Parents: 0d8cdf0
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Sep 27 14:46:00 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Sep 27 14:46:00 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/spark/TaskContext.java | 33 +++++++++-----------
 .../apache/spark/scheduler/DAGScheduler.scala   |  2 +-
 .../org/apache/spark/scheduler/ResultTask.scala |  6 +---
 .../apache/spark/scheduler/ShuffleMapTask.scala |  2 --
 .../scala/org/apache/spark/scheduler/Task.scala |  8 +++--
 5 files changed, 22 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5b922bb4/core/src/main/java/org/apache/spark/TaskContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/TaskContext.java b/core/src/main/java/org/apache/spark/TaskContext.java
index 09b8ce0..4e6d708 100644
--- a/core/src/main/java/org/apache/spark/TaskContext.java
+++ b/core/src/main/java/org/apache/spark/TaskContext.java
@@ -56,7 +56,7 @@ public class TaskContext implements Serializable {
    * @param taskMetrics performance metrics of the task
    */
   @DeveloperApi
-  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally,
+  public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally,
                      TaskMetrics taskMetrics) {
     this.attemptId = attemptId;
     this.partitionId = partitionId;
@@ -65,7 +65,6 @@ public class TaskContext implements Serializable {
     this.taskMetrics = taskMetrics;
   }
 
-
   /**
    * :: DeveloperApi ::
    * Contextual information about a task which can be read or mutated during execution.
@@ -76,8 +75,7 @@ public class TaskContext implements Serializable {
    * @param runningLocally whether the task is running locally in the driver JVM
    */
   @DeveloperApi
-  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
-                     Boolean runningLocally) {
+  public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally) {
     this.attemptId = attemptId;
     this.partitionId = partitionId;
     this.runningLocally = runningLocally;
@@ -85,7 +83,6 @@ public class TaskContext implements Serializable {
     this.taskMetrics = TaskMetrics.empty();
   }
 
-
   /**
    * :: DeveloperApi ::
    * Contextual information about a task which can be read or mutated during execution.
@@ -95,7 +92,7 @@ public class TaskContext implements Serializable {
    * @param attemptId the number of attempts to execute this task
    */
   @DeveloperApi
-  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) {
+  public TaskContext(int stageId, int partitionId, long attemptId) {
     this.attemptId = attemptId;
     this.partitionId = partitionId;
     this.runningLocally = false;
@@ -107,9 +104,9 @@ public class TaskContext implements Serializable {
     new ThreadLocal<TaskContext>();
 
   /**
-  * :: Internal API ::
-  * This is spark internal API, not intended to be called from user programs.
-  */
+   * :: Internal API ::
+   * This is spark internal API, not intended to be called from user programs.
+   */
   public static void setTaskContext(TaskContext tc) {
     taskContext.set(tc);
   }
@@ -118,10 +115,8 @@ public class TaskContext implements Serializable {
     return taskContext.get();
   }
 
-  /** 
-  * :: Internal API ::
-  */
-  public static void remove() {
+  /** :: Internal API ::  */
+  public static void unset() {
     taskContext.remove();
   }
 
@@ -130,22 +125,22 @@ public class TaskContext implements Serializable {
     new ArrayList<TaskCompletionListener>();
 
   // Whether the corresponding task has been killed.
-  private volatile Boolean interrupted = false;
+  private volatile boolean interrupted = false;
 
   // Whether the task has completed.
-  private volatile Boolean completed = false;
+  private volatile boolean completed = false;
 
   /**
    * Checks whether the task has completed.
    */
-  public Boolean isCompleted() {
+  public boolean isCompleted() {
     return completed;
   }
 
   /**
    * Checks whether the task has been killed.
    */
-  public Boolean isInterrupted() {
+  public boolean isInterrupted() {
     return interrupted;
   }
 
@@ -246,12 +241,12 @@ public class TaskContext implements Serializable {
   }
 
   @Deprecated
-  /** Deprecated: use getRunningLocally() */
+  /** Deprecated: use isRunningLocally() */
   public boolean runningLocally() {
     return runningLocally;
   }
 
-  public boolean getRunningLocally() {
+  public boolean isRunningLocally() {
     return runningLocally;
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5b922bb4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 32cf29e..70c235d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -641,7 +641,7 @@ class DAGScheduler(
         job.listener.taskSucceeded(0, result)
       } finally {
         taskContext.markTaskCompleted()
-        TaskContext.remove()
+        TaskContext.unset()
       }
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/5b922bb4/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 2ccbd8e..4a9ff91 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -58,11 +58,7 @@ private[spark] class ResultTask[T, U](
       ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
 
     metrics = Some(context.taskMetrics)
-    try {
-      func(context, rdd.iterator(partition, context))
-    } finally {
-      context.markTaskCompleted()
-    }
+    func(context, rdd.iterator(partition, context))
   }
 
   // This is only callable on the driver side.

http://git-wip-us.apache.org/repos/asf/spark/blob/5b922bb4/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index a98ee11..7970908 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -78,8 +78,6 @@ private[spark] class ShuffleMapTask(
             log.debug("Could not stop writer", e)
         }
         throw e
-    } finally {
-      context.markTaskCompleted()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5b922bb4/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index bf73f6f..c6e47c8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -52,7 +52,12 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
     if (_killed) {
       kill(interruptThread = false)
     }
-    runTask(context)
+    try {
+      runTask(context)
+    } finally {
+      context.markTaskCompleted()
+      TaskContext.unset()
+    }
   }
 
   def runTask(context: TaskContext): T
@@ -93,7 +98,6 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
     if (interruptThread && taskThread != null) {
       taskThread.interrupt()
     }
-    TaskContext.remove()
   }  
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org