You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/15 08:08:56 UTC
git commit: Merge pull request #435 from tdas/filestream-fix
Updated Branches:
refs/heads/branch-0.9 fbfbb331d -> 2859cab2f
Merge pull request #435 from tdas/filestream-fix
Fixed the flaky tests by making SparkConf not serializable
SparkConf was being serialized with CoGroupedRDD and Aggregator, which somehow caused OptionalJavaException while being deserialized as part of a ShuffleMapTask. SparkConf should not even be serializable (according to conversation with Matei). This change fixes that.
@mateiz @pwendell
(cherry picked from commit 139c24ef08e6ffb090975c9808a2cba304eb79e0)
Signed-off-by: Patrick Wendell <pw...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2859cab2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2859cab2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2859cab2
Branch: refs/heads/branch-0.9
Commit: 2859cab2f50099d1a691aecb5f7e5dfa26dccdb1
Parents: fbfbb33
Author: Patrick Wendell <pw...@gmail.com>
Authored: Tue Jan 14 23:07:55 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Jan 14 23:08:19 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/Aggregator.scala | 3 +--
.../main/scala/org/apache/spark/SparkConf.scala | 3 ++-
.../org/apache/spark/rdd/CoGroupedRDD.scala | 3 +--
.../flume/src/test/resources/log4j.properties | 2 +-
.../kafka/src/test/resources/log4j.properties | 2 +-
.../streaming/kafka/KafkaStreamSuite.scala | 1 +
.../mqtt/src/test/resources/log4j.properties | 2 +-
.../spark/streaming/mqtt/MQTTStreamSuite.scala | 1 +
.../twitter/src/test/resources/log4j.properties | 2 +-
.../streaming/twitter/TwitterStreamSuite.scala | 1 +
.../zeromq/src/test/resources/log4j.properties | 2 +-
.../streaming/zeromq/ZeroMQStreamSuite.scala | 1 +
.../org/apache/spark/streaming/Checkpoint.scala | 10 ++++---
.../apache/spark/streaming/DStreamGraph.scala | 2 ++
.../dstream/DStreamCheckpointData.scala | 26 +++++++++++++++++-
.../spark/streaming/CheckpointSuite.scala | 28 ++++++++++++++------
16 files changed, 66 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index edbea6e..c4579cf 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -33,8 +33,7 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
- private val sparkConf = SparkEnv.get.conf
- private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
+ private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 93d3d1f..369c6ce 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import com.typesafe.config.ConfigFactory
+import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
@@ -41,7 +42,7 @@ import com.typesafe.config.ConfigFactory
*
* @param loadDefaults whether to load values from the system properties and classpath
*/
-class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with Logging {
+class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 9c6b308..f2feb40 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -66,7 +66,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
private type CoGroupValue = (Any, Int) // Int is dependency number
private type CoGroupCombiner = Seq[CoGroup]
- private val sparkConf = SparkEnv.get.conf
private var serializerClass: String = null
def setSerializer(cls: String): CoGroupedRDD[K] = {
@@ -106,7 +105,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
-
+ val sparkConf = SparkEnv.get.conf
val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/flume/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties
index 063529a..d1bd73a 100644
--- a/external/flume/src/test/resources/log4j.properties
+++ b/external/flume/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.file=external/flume/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/resources/log4j.properties b/external/kafka/src/test/resources/log4j.properties
index 063529a..38910d1 100644
--- a/external/kafka/src/test/resources/log4j.properties
+++ b/external/kafka/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.file=external/kafka/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 9c81f23..d9809f6 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -35,5 +35,6 @@ class KafkaStreamSuite extends TestSuiteBase {
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
+ ssc.stop()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/mqtt/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties
index 063529a..d0462c7 100644
--- a/external/mqtt/src/test/resources/log4j.properties
+++ b/external/mqtt/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.file=external/mqtt/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 73e7ce6..89c40ad 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -32,5 +32,6 @@ class MQTTStreamSuite extends TestSuiteBase {
val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
+ ssc.stop()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/twitter/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties
index 063529a..c918335 100644
--- a/external/twitter/src/test/resources/log4j.properties
+++ b/external/twitter/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.file=external/twitter/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index ccc3878..06ab0cd 100644
--- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -39,5 +39,6 @@ class TwitterStreamSuite extends TestSuiteBase {
// Note that actually testing the data receiving is hard as authentication keys are
// necessary for accessing Twitter live stream
+ ssc.stop()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/zeromq/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties
index 063529a..304683d 100644
--- a/external/zeromq/src/test/resources/log4j.properties
+++ b/external/zeromq/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.file=external/zeromq/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 4193b8a..92d55a7 100644
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -40,5 +40,6 @@ class ZeroMQStreamSuite extends TestSuiteBase {
StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
// TODO: Actually test data receiving
+ ssc.stop()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 5046a1d..4d778dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -42,11 +42,13 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
- val sparkConf = ssc.conf
+ val sparkConfPairs = ssc.conf.getAll
- // These should be unset when a checkpoint is deserialized,
- // otherwise the SparkContext won't initialize correctly.
- sparkConf.remove("spark.driver.host").remove("spark.driver.port")
+ def sparkConf = {
+ new SparkConf(false).setAll(sparkConfPairs)
+ .remove("spark.driver.host")
+ .remove("spark.driver.port")
+ }
def validate() {
assert(master != null, "Checkpoint.master is null")
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 8faa79f..0683113 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -163,8 +163,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
logDebug("DStreamGraph.writeObject used")
this.synchronized {
checkpointInProgress = true
+ logDebug("Enabled checkpoint mode")
oos.defaultWriteObject()
checkpointInProgress = false
+ logDebug("Disabled checkpoint mode")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 38bad5a..906a16e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.dstream
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
-import java.io.{ObjectInputStream, IOException}
+import java.io.{ObjectOutputStream, ObjectInputStream, IOException}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.Logging
@@ -118,7 +118,31 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
}
@throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream) {
+ logDebug(this.getClass().getSimpleName + ".writeObject used")
+ if (dstream.context.graph != null) {
+ dstream.context.graph.synchronized {
+ if (dstream.context.graph.checkpointInProgress) {
+ oos.defaultWriteObject()
+ } else {
+ val msg = "Object of " + this.getClass.getName + " is being serialized " +
+ " possibly as a part of closure of an RDD operation. This is because " +
+ " the DStream object is being referred to from within the closure. " +
+ " Please rewrite the RDD operation inside this DStream to avoid this. " +
+ " This has been enforced to avoid bloating of Spark tasks " +
+ " with unnecessary objects."
+ throw new java.io.NotSerializableException(msg)
+ }
+ }
+ } else {
+ throw new java.io.NotSerializableException(
+ "Graph is unexpectedly null when DStream is being serialized.")
+ }
+ }
+
+ @throws(classOf[IOException])
private def readObject(ois: ObjectInputStream) {
+ logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
timeToOldestCheckpointFileTime = new HashMap[Time, Time]
timeToCheckpointFile = new HashMap[Time, String]
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 89daf47..831e7c1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -151,17 +151,29 @@ class CheckpointSuite extends TestSuiteBase {
val value = "myvalue"
System.setProperty(key, value)
ssc = new StreamingContext(master, framework, batchDuration)
+ val originalConf = ssc.conf
+
val cp = new Checkpoint(ssc, Time(1000))
- assert(!cp.sparkConf.contains("spark.driver.host"))
- assert(!cp.sparkConf.contains("spark.driver.port"))
- assert(!cp.sparkConf.contains("spark.hostPort"))
- assert(cp.sparkConf.get(key) === value)
+ val cpConf = cp.sparkConf
+ assert(cpConf.get("spark.master") === originalConf.get("spark.master"))
+ assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
+ assert(cpConf.get(key) === value)
ssc.stop()
+
+ // Serialize/deserialize to simulate write to storage and reading it back
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
- assert(!newCp.sparkConf.contains("spark.driver.host"))
- assert(!newCp.sparkConf.contains("spark.driver.port"))
- assert(!newCp.sparkConf.contains("spark.hostPort"))
- assert(newCp.sparkConf.get(key) === value)
+
+ val newCpConf = newCp.sparkConf
+ assert(newCpConf.get("spark.master") === originalConf.get("spark.master"))
+ assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
+ assert(newCpConf.get(key) === value)
+ assert(!newCpConf.contains("spark.driver.host"))
+ assert(!newCpConf.contains("spark.driver.port"))
+
+ // Check if all the parameters have been restored
+ ssc = new StreamingContext(null, newCp, null)
+ val restoredConf = ssc.conf
+ assert(restoredConf.get(key) === value)
}