You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/08/14 14:48:20 UTC
spark git commit: [SPARK-21563][CORE] Fix race condition when
serializing TaskDescriptions and adding jars
Repository: spark
Updated Branches:
refs/heads/master 34d2134a9 -> 6847e93cf
[SPARK-21563][CORE] Fix race condition when serializing TaskDescriptions and adding jars
## What changes were proposed in this pull request?
Fix the race condition when serializing TaskDescriptions and adding jars by keeping the set of jars and files for a TaskSet constant across the lifetime of the TaskSet. Otherwise TaskDescription serialization can produce an invalid serialization when new file/jars are added concurrently as the TaskDescription is serialized.
## How was this patch tested?
Additional unit test ensures jars/files contained in the TaskDescription remain constant throughout the lifetime of the TaskSet.
Author: Andrew Ash <an...@andrewash.com>
Closes #18913 from ash211/SPARK-21563.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6847e93c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6847e93c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6847e93c
Branch: refs/heads/master
Commit: 6847e93cf427aa971dac1ea261c1443eebf4089e
Parents: 34d2134
Author: Andrew Ash <an...@andrewash.com>
Authored: Mon Aug 14 22:48:08 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Aug 14 22:48:08 2017 +0800
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 7 ++++
.../apache/spark/scheduler/TaskSetManager.scala | 8 +++--
.../spark/scheduler/TaskSetManagerSuite.scala | 34 +++++++++++++++++++-
3 files changed, 46 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6847e93c/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5316468..136f0af 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1490,6 +1490,8 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Add a file to be downloaded with this Spark job on every node.
*
+ * If a file is added during execution, it will not be available until the next TaskSet starts.
+ *
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(fileName)` to find its download location.
@@ -1506,6 +1508,8 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Add a file to be downloaded with this Spark job on every node.
*
+ * If a file is added during execution, it will not be available until the next TaskSet starts.
+ *
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(fileName)` to find its download location.
@@ -1792,6 +1796,9 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Adds a JAR dependency for all tasks to be executed on this `SparkContext` in the future.
+ *
+ * If a jar is added during execution, it will not be available until the next TaskSet starts.
+ *
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems),
* an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/6847e93c/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 589fe67..c251071 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -56,6 +56,10 @@ private[spark] class TaskSetManager(
private val conf = sched.sc.conf
+ // SPARK-21563 make a copy of the jars/files so they are consistent across the TaskSet
+ private val addedJars = HashMap[String, Long](sched.sc.addedJars.toSeq: _*)
+ private val addedFiles = HashMap[String, Long](sched.sc.addedFiles.toSeq: _*)
+
// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)
@@ -502,8 +506,8 @@ private[spark] class TaskSetManager(
execId,
taskName,
index,
- sched.sc.addedFiles,
- sched.sc.addedJars,
+ addedFiles,
+ addedJars,
task.localProperties,
serializedTask)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6847e93c/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index e46900e..3696df0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.config
import org.apache.spark.internal.Logging
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.{AccumulatorV2, ManualClock}
+import org.apache.spark.util.{AccumulatorV2, ManualClock, Utils}
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
extends DAGScheduler(sc) {
@@ -1214,6 +1214,38 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt())
}
+ test("SPARK-21563 context's added jars shouldn't change mid-TaskSet") {
+ sc = new SparkContext("local", "test")
+ val addedJarsPreTaskSet = Map[String, Long](sc.addedJars.toSeq: _*)
+ assert(addedJarsPreTaskSet.size === 0)
+
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ val taskSet1 = FakeTask.createTaskSet(3)
+ val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES, clock = new ManualClock)
+
+ // all tasks from the first taskset have the same jars
+ val taskOption1 = manager1.resourceOffer("exec1", "host1", NO_PREF)
+ assert(taskOption1.get.addedJars === addedJarsPreTaskSet)
+ val taskOption2 = manager1.resourceOffer("exec1", "host1", NO_PREF)
+ assert(taskOption2.get.addedJars === addedJarsPreTaskSet)
+
+ // even with a jar added mid-TaskSet
+ val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar")
+ sc.addJar(jarPath.toString)
+ val addedJarsMidTaskSet = Map[String, Long](sc.addedJars.toSeq: _*)
+ assert(addedJarsPreTaskSet !== addedJarsMidTaskSet)
+ val taskOption3 = manager1.resourceOffer("exec1", "host1", NO_PREF)
+ // which should have the old version of the jars list
+ assert(taskOption3.get.addedJars === addedJarsPreTaskSet)
+
+ // and then the jar does appear in the next TaskSet
+ val taskSet2 = FakeTask.createTaskSet(1)
+ val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, clock = new ManualClock)
+
+ val taskOption4 = manager2.resourceOffer("exec1", "host1", NO_PREF)
+ assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
+ }
+
private def createTaskResult(
id: Int,
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org