You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/14 06:02:57 UTC

spark git commit: [SPARK-12819] Deprecate TaskContext.isRunningLocally()

Repository: spark
Updated Branches:
  refs/heads/master 20d8ef858 -> e2ae7bd04


[SPARK-12819] Deprecate TaskContext.isRunningLocally()

We've already removed local execution but didn't deprecate `TaskContext.isRunningLocally()`; we should deprecate it for 2.0.

Author: Josh Rosen <jo...@databricks.com>

Closes #10751 from JoshRosen/remove-local-exec-from-taskcontext.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2ae7bd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2ae7bd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2ae7bd0

Branch: refs/heads/master
Commit: e2ae7bd046f6d8d6a375c2e81e5a51d7d78ca984
Parents: 20d8ef8
Author: Josh Rosen <jo...@databricks.com>
Authored: Wed Jan 13 21:02:54 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jan 13 21:02:54 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/CacheManager.scala     | 5 -----
 core/src/main/scala/org/apache/spark/TaskContext.scala      | 3 ++-
 core/src/main/scala/org/apache/spark/TaskContextImpl.scala  | 3 +--
 core/src/main/scala/org/apache/spark/scheduler/Task.scala   | 3 +--
 .../src/test/scala/org/apache/spark/CacheManagerSuite.scala | 9 ---------
 5 files changed, 4 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e2ae7bd0/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 4d20c73..36b536e 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -68,11 +68,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
           logInfo(s"Partition $key not found, computing it")
           val computedValues = rdd.computeOrReadCheckpoint(partition, context)
 
-          // If the task is running locally, do not persist the result
-          if (context.isRunningLocally) {
-            return computedValues
-          }
-
           // Otherwise, cache the values and keep track of any updates in block statuses
           val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
           val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)

http://git-wip-us.apache.org/repos/asf/spark/blob/e2ae7bd0/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index e25ed0f..7704abc 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -97,8 +97,9 @@ abstract class TaskContext extends Serializable {
 
   /**
    * Returns true if the task is running locally in the driver program.
-   * @return
+   * @return false
    */
+  @deprecated("Local execution was removed, so this always returns false", "2.0.0")
   def isRunningLocally(): Boolean
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e2ae7bd0/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 6c49363..94ff884 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -33,7 +33,6 @@ private[spark] class TaskContextImpl(
     override val taskMemoryManager: TaskMemoryManager,
     @transient private val metricsSystem: MetricsSystem,
     internalAccumulators: Seq[Accumulator[Long]],
-    val runningLocally: Boolean = false,
     val taskMetrics: TaskMetrics = TaskMetrics.empty)
   extends TaskContext
   with Logging {
@@ -85,7 +84,7 @@ private[spark] class TaskContextImpl(
 
   override def isCompleted(): Boolean = completed
 
-  override def isRunningLocally(): Boolean = runningLocally
+  override def isRunningLocally(): Boolean = false
 
   override def isInterrupted(): Boolean = interrupted
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e2ae7bd0/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 0379ca2..fca5792 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -74,8 +74,7 @@ private[spark] abstract class Task[T](
       attemptNumber,
       taskMemoryManager,
       metricsSystem,
-      internalAccumulators,
-      runningLocally = false)
+      internalAccumulators)
     TaskContext.setTaskContext(context)
     context.taskMetrics.setHostname(Utils.localHostName())
     context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)

http://git-wip-us.apache.org/repos/asf/spark/blob/e2ae7bd0/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index cb8bd04..30aa94c 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -82,15 +82,6 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before
     assert(value.toList === List(5, 6, 7))
   }
 
-  test("get uncached local rdd") {
-    // Local computation should not persist the resulting value, so don't expect a put().
-    when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None)
-
-    val context = new TaskContextImpl(0, 0, 0, 0, null, null, Seq.empty, runningLocally = true)
-    val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
-    assert(value.toList === List(1, 2, 3, 4))
-  }
-
   test("verify task metrics updated correctly") {
     cacheManager = sc.env.cacheManager
     val context = TaskContext.empty()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org