You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/02/03 09:46:07 UTC

spark git commit: [SPARK-5549] Define TaskContext interface in Scala.

Repository: spark
Updated Branches:
  refs/heads/master 523a93523 -> bebf4c42b


[SPARK-5549] Define TaskContext interface in Scala.

So the interface documentation shows up in ScalaDoc.

Author: Reynold Xin <rx...@databricks.com>

Closes #4324 from rxin/TaskContext-scala and squashes the following commits:

2480a17 [Reynold Xin] comment
573756f [Reynold Xin] style fixes and javadoc fixes.
87dd537 [Reynold Xin] [SPARK-5549] Define TaskContext interface in Scala.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bebf4c42
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bebf4c42
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bebf4c42

Branch: refs/heads/master
Commit: bebf4c42bef3e75d31ffce9bfdb331c16f34ddb1
Parents: 523a935
Author: Reynold Xin <rx...@databricks.com>
Authored: Tue Feb 3 00:46:04 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Feb 3 00:46:04 2015 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/spark/TaskContext.java | 126 -----------------
 .../scala/org/apache/spark/TaskContext.scala    | 136 +++++++++++++++++++
 .../org/apache/spark/TaskContextImpl.scala      |   8 +-
 .../util/JavaTaskCompletionListenerImpl.java    |  38 ------
 .../spark/JavaTaskCompletionListenerImpl.java   |  39 ++++++
 .../spark/JavaTaskContextCompileCheck.java      |  41 ++++++
 6 files changed, 220 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bebf4c42/core/src/main/java/org/apache/spark/TaskContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/TaskContext.java b/core/src/main/java/org/apache/spark/TaskContext.java
deleted file mode 100644
index 095f9fb..0000000
--- a/core/src/main/java/org/apache/spark/TaskContext.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark;
-
-import java.io.Serializable;
-
-import scala.Function0;
-import scala.Function1;
-import scala.Unit;
-
-import org.apache.spark.annotation.DeveloperApi;
-import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.util.TaskCompletionListener;
-
-/**
- * Contextual information about a task which can be read or mutated during
- * execution. To access the TaskContext for a running task use
- * TaskContext.get().
- */
-public abstract class TaskContext implements Serializable {
-  /**
-   * Return the currently active TaskContext. This can be called inside of
-   * user functions to access contextual information about running tasks.
-   */
-  public static TaskContext get() {
-    return taskContext.get();
-  }
-
-  private static ThreadLocal<TaskContext> taskContext =
-    new ThreadLocal<TaskContext>();
-
-  static void setTaskContext(TaskContext tc) {
-    taskContext.set(tc);
-  }
-
-  static void unset() {
-    taskContext.remove();
-  }
-
-  /**
-   * Whether the task has completed.
-   */
-  public abstract boolean isCompleted();
-
-  /**
-   * Whether the task has been killed.
-   */
-  public abstract boolean isInterrupted();
-
-  /** @deprecated use {@link #isRunningLocally()} */
-  @Deprecated
-  public abstract boolean runningLocally();
-
-  public abstract boolean isRunningLocally();
-
-  /**
-   * Add a (Java friendly) listener to be executed on task completion.
-   * This will be called in all situation - success, failure, or cancellation.
-   * An example use is for HadoopRDD to register a callback to close the input stream.
-   */
-  public abstract TaskContext addTaskCompletionListener(TaskCompletionListener listener);
-
-  /**
-   * Add a listener in the form of a Scala closure to be executed on task completion.
-   * This will be called in all situations - success, failure, or cancellation.
-   * An example use is for HadoopRDD to register a callback to close the input stream.
-   */
-  public abstract TaskContext addTaskCompletionListener(final Function1<TaskContext, Unit> f);
-
-  /**
-   * Add a callback function to be executed on task completion. An example use
-   * is for HadoopRDD to register a callback to close the input stream.
-   * Will be called in any situation - success, failure, or cancellation.
-   *
-   * @deprecated use {@link #addTaskCompletionListener(scala.Function1)}
-   *
-   * @param f Callback function.
-   */
-  @Deprecated
-  public abstract void addOnCompleteCallback(final Function0<Unit> f);
-
-  /**
-   * The ID of the stage that this task belong to.
-   */
-  public abstract int stageId();
-
-  /**
-   * The ID of the RDD partition that is computed by this task.
-   */
-  public abstract int partitionId();
-
-  /**
-   * How many times this task has been attempted.  The first task attempt will be assigned
-   * attemptNumber = 0, and subsequent attempts will have increasing attempt numbers.
-   */
-  public abstract int attemptNumber();
-
-  /** @deprecated use {@link #taskAttemptId()}; it was renamed to avoid ambiguity. */
-  @Deprecated
-  public abstract long attemptId();
-
-  /**
-   * An ID that is unique to this task attempt (within the same SparkContext, no two task attempts
-   * will share the same attempt ID).  This is roughly equivalent to Hadoop's TaskAttemptID.
-   */
-  public abstract long taskAttemptId();
-
-  /** ::DeveloperApi:: */
-  @DeveloperApi
-  public abstract TaskMetrics taskMetrics();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/bebf4c42/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
new file mode 100644
index 0000000..af9c138
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.Serializable
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.util.TaskCompletionListener
+
+
+object TaskContext {
+  /**
+   * Return the currently active TaskContext. This can be called inside of
+   * user functions to access contextual information about running tasks.
+   */
+  def get(): TaskContext = taskContext.get
+
+  private val taskContext: ThreadLocal[TaskContext] = new ThreadLocal[TaskContext]
+
+  // Note: protected[spark] instead of private[spark] to prevent the following two from
+  // showing up in JavaDoc.
+  /**
+   * Set the thread local TaskContext. Internal to Spark.
+   */
+  protected[spark] def setTaskContext(tc: TaskContext): Unit = taskContext.set(tc)
+
+  /**
+   * Unset the thread local TaskContext. Internal to Spark.
+   */
+  protected[spark] def unset(): Unit = taskContext.remove()
+}
+
+
+/**
+ * Contextual information about a task which can be read or mutated during
+ * execution. To access the TaskContext for a running task, use:
+ * {{{
+ *   org.apache.spark.TaskContext.get()
+ * }}}
+ */
+abstract class TaskContext extends Serializable {
+  // Note: TaskContext must NOT define a get method. Otherwise it will prevent the Scala compiler
+  // from generating a static get method (based on the companion object's get method).
+
+  // Note: Update JavaTaskContextCompileCheck when new methods are added to this class.
+
+  // Note: getters in this class are defined with parentheses to maintain backward compatibility.
+
+  /**
+   * Returns true if the task has completed.
+   */
+  def isCompleted(): Boolean
+
+  /**
+   * Returns true if the task has been killed.
+   */
+  def isInterrupted(): Boolean
+
+  @deprecated("1.2.0", "use isRunningLocally")
+  def runningLocally(): Boolean
+
+  /**
+   * Returns true if the task is running locally in the driver program.
+   * @return
+   */
+  def isRunningLocally(): Boolean
+
+  /**
+   * Adds a (Java friendly) listener to be executed on task completion.
+   * This will be called in all situation - success, failure, or cancellation.
+   * An example use is for HadoopRDD to register a callback to close the input stream.
+   */
+  def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext
+
+  /**
+   * Adds a listener in the form of a Scala closure to be executed on task completion.
+   * This will be called in all situations - success, failure, or cancellation.
+   * An example use is for HadoopRDD to register a callback to close the input stream.
+   */
+  def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext
+
+  /**
+   * Adds a callback function to be executed on task completion. An example use
+   * is for HadoopRDD to register a callback to close the input stream.
+   * Will be called in any situation - success, failure, or cancellation.
+   *
+   * @param f Callback function.
+   */
+  @deprecated("1.2.0", "use addTaskCompletionListener")
+  def addOnCompleteCallback(f: () => Unit)
+
+  /**
+   * The ID of the stage that this task belong to.
+   */
+  def stageId(): Int
+
+  /**
+   * The ID of the RDD partition that is computed by this task.
+   */
+  def partitionId(): Int
+
+  /**
+   * How many times this task has been attempted.  The first task attempt will be assigned
+   * attemptNumber = 0, and subsequent attempts will have increasing attempt numbers.
+   */
+  def attemptNumber(): Int
+
+  @deprecated("1.3.0", "use attemptNumber")
+  def attemptId(): Long
+
+  /**
+   * An ID that is unique to this task attempt (within the same SparkContext, no two task attempts
+   * will share the same attempt ID).  This is roughly equivalent to Hadoop's TaskAttemptID.
+   */
+  def taskAttemptId(): Long
+
+  /** ::DeveloperApi:: */
+  @DeveloperApi
+  def taskMetrics(): TaskMetrics
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bebf4c42/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 9bb0c61..337c8e4 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -33,7 +33,7 @@ private[spark] class TaskContextImpl(
   with Logging {
 
   // For backwards-compatibility; this method is now deprecated as of 1.3.0.
-  override def attemptId: Long = taskAttemptId
+  override def attemptId(): Long = taskAttemptId
 
   // List of callback functions to execute when the task completes.
   @transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]
@@ -87,10 +87,10 @@ private[spark] class TaskContextImpl(
     interrupted = true
   }
 
-  override def isCompleted: Boolean = completed
+  override def isCompleted(): Boolean = completed
 
-  override def isRunningLocally: Boolean = runningLocally
+  override def isRunningLocally(): Boolean = runningLocally
 
-  override def isInterrupted: Boolean = interrupted
+  override def isInterrupted(): Boolean = interrupted
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bebf4c42/core/src/test/java/org/apache/spark/util/JavaTaskCompletionListenerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/util/JavaTaskCompletionListenerImpl.java b/core/src/test/java/org/apache/spark/util/JavaTaskCompletionListenerImpl.java
deleted file mode 100644
index e9ec700..0000000
--- a/core/src/test/java/org/apache/spark/util/JavaTaskCompletionListenerImpl.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util;
-
-import org.apache.spark.TaskContext;
-
-
-/**
- * A simple implementation of TaskCompletionListener that makes sure TaskCompletionListener and
- * TaskContext is Java friendly.
- */
-public class JavaTaskCompletionListenerImpl implements TaskCompletionListener {
-
-  @Override
-  public void onTaskCompletion(TaskContext context) {
-    context.isCompleted();
-    context.isInterrupted();
-    context.stageId();
-    context.partitionId();
-    context.isRunningLocally();
-    context.addTaskCompletionListener(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/bebf4c42/core/src/test/java/test/org/apache/spark/JavaTaskCompletionListenerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/test/org/apache/spark/JavaTaskCompletionListenerImpl.java b/core/src/test/java/test/org/apache/spark/JavaTaskCompletionListenerImpl.java
new file mode 100644
index 0000000..e38bc38
--- /dev/null
+++ b/core/src/test/java/test/org/apache/spark/JavaTaskCompletionListenerImpl.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark;
+
+import org.apache.spark.TaskContext;
+import org.apache.spark.util.TaskCompletionListener;
+
+
+/**
+ * A simple implementation of TaskCompletionListener that makes sure TaskCompletionListener and
+ * TaskContext is Java friendly.
+ */
+public class JavaTaskCompletionListenerImpl implements TaskCompletionListener {
+
+  @Override
+  public void onTaskCompletion(TaskContext context) {
+    context.isCompleted();
+    context.isInterrupted();
+    context.stageId();
+    context.partitionId();
+    context.isRunningLocally();
+    context.addTaskCompletionListener(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bebf4c42/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
new file mode 100644
index 0000000..4a918f7
--- /dev/null
+++ b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark;
+
+import org.apache.spark.TaskContext;
+
+/**
+ * Something to make sure that TaskContext can be used in Java.
+ */
+public class JavaTaskContextCompileCheck {
+
+  public static void test() {
+    TaskContext tc = TaskContext.get();
+
+    tc.isCompleted();
+    tc.isInterrupted();
+    tc.isRunningLocally();
+
+    tc.addTaskCompletionListener(new JavaTaskCompletionListenerImpl());
+
+    tc.attemptNumber();
+    tc.partitionId();
+    tc.stageId();
+    tc.taskAttemptId();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org