You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/04/23 04:35:17 UTC

git commit: [streaming][SPARK-1578] Removed requirement for TTL in StreamingContext.

Repository: spark
Updated Branches:
  refs/heads/master 2de573877 -> f3d19a9f1


[streaming][SPARK-1578] Removed requirement for TTL in StreamingContext.

Since shuffles and RDDs that are out of context are automatically cleaned by Spark core (using ContextCleaner) there is no need for setting the cleaner TTL while creating a StreamingContext.

Author: Tathagata Das <ta...@gmail.com>

Closes #491 from tdas/ttl-fix and squashes the following commits:

cf01dc7 [Tathagata Das] Removed requirement for TTL in StreamingContext.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3d19a9f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3d19a9f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3d19a9f

Branch: refs/heads/master
Commit: f3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6
Parents: 2de5738
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Apr 22 19:35:13 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Apr 22 19:35:13 2014 -0700

----------------------------------------------------------------------
 .../spark/streaming/StreamingContext.scala      | 15 +------
 .../spark/streaming/InputStreamsSuite.scala     |  2 +-
 .../spark/streaming/StreamingContextSuite.scala | 45 ++++++--------------
 .../apache/spark/streaming/TestSuiteBase.scala  |  1 -
 4 files changed, 14 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f3d19a9f/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6d9dc87..9ba6e02 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -116,11 +116,6 @@ class StreamingContext private[streaming] (
     }
   }
 
-  if (MetadataCleaner.getDelaySeconds(sc.conf) < 0) {
-    throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
-      + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")
-  }
-
   private[streaming] val conf = sc.conf
 
   private[streaming] val env = SparkEnv.get
@@ -500,8 +495,6 @@ class StreamingContext private[streaming] (
 
 object StreamingContext extends Logging {
 
-  private[streaming] val DEFAULT_CLEANER_TTL = 3600
-
   implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = {
     new PairDStreamFunctions[K, V](stream)
   }
@@ -546,13 +539,7 @@ object StreamingContext extends Logging {
   def jarOfClass(cls: Class[_]): Option[String] = SparkContext.jarOfClass(cls)
 
   private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
-    // Set the default cleaner delay to an hour if not already set.
-    // This should be sufficient for even 1 second batch intervals.
-    if (MetadataCleaner.getDelaySeconds(conf) < 0) {
-      MetadataCleaner.setDelaySeconds(conf, DEFAULT_CLEANER_TTL)
-    }
-    val sc = new SparkContext(conf)
-    sc
+    new SparkContext(conf)
   }
 
   private[streaming] def createNewSparkContext(

http://git-wip-us.apache.org/repos/asf/spark/blob/f3d19a9f/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 46b7f63..3bad871 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -143,7 +143,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
   }
 
-  // TODO: This test makes assumptions about Thread.sleep() and is flaky
+  // TODO: This test works in IntelliJ but not through SBT
   ignore("actor input stream") {
     // Start the server
     val testServer = new TestServer()

http://git-wip-us.apache.org/repos/asf/spark/blob/f3d19a9f/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 6d14b1f..3e2b25a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -38,15 +38,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
   val batchDuration = Milliseconds(500)
   val sparkHome = "someDir"
   val envPair = "key" -> "value"
-  val ttl = StreamingContext.DEFAULT_CLEANER_TTL + 100
 
   var sc: SparkContext = null
   var ssc: StreamingContext = null
 
-  before {
-    System.clearProperty("spark.cleaner.ttl")
-  }
-
   after {
     if (ssc != null) {
       ssc.stop()
@@ -62,67 +57,51 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
     ssc = new StreamingContext(master, appName, batchDuration)
     assert(ssc.sparkContext.conf.get("spark.master") === master)
     assert(ssc.sparkContext.conf.get("spark.app.name") === appName)
-    assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
-      StreamingContext.DEFAULT_CLEANER_TTL)
   }
 
   test("from no conf + spark home") {
     ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil)
     assert(ssc.conf.get("spark.home") === sparkHome)
-    assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
-      StreamingContext.DEFAULT_CLEANER_TTL)
   }
 
   test("from no conf + spark home + env") {
     ssc = new StreamingContext(master, appName, batchDuration,
       sparkHome, Nil, Map(envPair))
     assert(ssc.conf.getExecutorEnv.exists(_ == envPair))
-    assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
-      StreamingContext.DEFAULT_CLEANER_TTL)
-  }
-
-  test("from conf without ttl set") {
-    val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
-    ssc = new StreamingContext(myConf, batchDuration)
-    assert(MetadataCleaner.getDelaySeconds(ssc.conf) ===
-      StreamingContext.DEFAULT_CLEANER_TTL)
   }
 
-  test("from conf with ttl set") {
+  test("from conf with settings") {
     val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
-    myConf.set("spark.cleaner.ttl", ttl.toString)
+    myConf.set("spark.cleaner.ttl", "10")
     ssc = new StreamingContext(myConf, batchDuration)
-    assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl)
+    assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
   }
 
-  test("from existing SparkContext without ttl set") {
+  test("from existing SparkContext") {
     sc = new SparkContext(master, appName)
-    val exception = intercept[SparkException] {
-      ssc = new StreamingContext(sc, batchDuration)
-    }
-    assert(exception.getMessage.contains("ttl"))
+    ssc = new StreamingContext(sc, batchDuration)
   }
 
-  test("from existing SparkContext with ttl set") {
+  test("from existing SparkContext with settings") {
     val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
-    myConf.set("spark.cleaner.ttl", ttl.toString)
+    myConf.set("spark.cleaner.ttl", "10")
     ssc = new StreamingContext(myConf, batchDuration)
-    assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl)
+    assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
   }
 
   test("from checkpoint") {
     val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
-    myConf.set("spark.cleaner.ttl", ttl.toString)
+    myConf.set("spark.cleaner.ttl", "10")
     val ssc1 = new StreamingContext(myConf, batchDuration)
     addInputStream(ssc1).register
     ssc1.start()
     val cp = new Checkpoint(ssc1, Time(1000))
-    assert(MetadataCleaner.getDelaySeconds(cp.sparkConf) === ttl)
+    assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
     ssc1.stop()
     val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
-    assert(MetadataCleaner.getDelaySeconds(newCp.sparkConf) === ttl)
+    assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10)
     ssc = new StreamingContext(null, newCp, null)
-    assert(MetadataCleaner.getDelaySeconds(ssc.conf) === ttl)
+    assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
   }
 
   test("start and stop state check") {

http://git-wip-us.apache.org/repos/asf/spark/blob/f3d19a9f/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index aa2d5c2..4f63fd3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -137,7 +137,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
   val conf = new SparkConf()
     .setMaster(master)
     .setAppName(framework)
-    .set("spark.cleaner.ttl", StreamingContext.DEFAULT_CLEANER_TTL.toString)
 
   // Default before function for any streaming test suite. Override this
   // if you want to add your stuff to "before" (i.e., don't call before { } )