You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/12 17:48:28 UTC

spark git commit: [SPARK-7532] [STREAMING] StreamingContext.start() made to logWarning and not throw exception

Repository: spark
Updated Branches:
  refs/heads/master f3e8e6006 -> ec6f2a977


[SPARK-7532] [STREAMING] StreamingContext.start() made to logWarning and not throw exception

Author: Tathagata Das <ta...@gmail.com>

Closes #6060 from tdas/SPARK-7532 and squashes the following commits:

6fe2e83 [Tathagata Das] Update docs
7dadfc3 [Tathagata Das] Fixed bug again
99c7678 [Tathagata Das] Added logInfo
65aec20 [Tathagata Das] Fix bug
5bf031b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7532
1a9a818 [Tathagata Das] Fix scaladoc
c584313 [Tathagata Das] StreamingContext.start() made to logWarning and not throw exception


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec6f2a97
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec6f2a97
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec6f2a97

Branch: refs/heads/master
Commit: ec6f2a9774167014566fb9608ee4394d2ce5fd6a
Parents: f3e8e60
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue May 12 08:48:24 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue May 12 08:48:24 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/StreamingContext.scala      | 27 ++++++++++----------
 .../spark/streaming/StreamingContextSuite.scala |  4 +--
 2 files changed, 14 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ec6f2a97/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 2c5834d..8461e90 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -528,28 +528,27 @@ class StreamingContext private[streaming] (
   /**
    * Start the execution of the streams.
    *
-   * @throws SparkException if the context has already been started or stopped.
+   * @throws SparkException if the StreamingContext is already stopped.
    */
   def start(): Unit = synchronized {
-    import StreamingContext._
     state match {
       case INITIALIZED =>
-        // good to start
+        validate()
+        startSite.set(DStream.getCreationSite())
+        sparkContext.setCallSite(startSite.get)
+        StreamingContext.ACTIVATION_LOCK.synchronized {
+          StreamingContext.assertNoOtherContextIsActive()
+          scheduler.start()
+          uiTab.foreach(_.attach())
+          state = StreamingContextState.ACTIVE
+          StreamingContext.setActiveContext(this)
+        }
+        logInfo("StreamingContext started")
       case ACTIVE =>
-        throw new SparkException("StreamingContext has already been started")
+        logWarning("StreamingContext has already been started")
       case STOPPED =>
         throw new SparkException("StreamingContext has already been stopped")
     }
-    validate()
-    startSite.set(DStream.getCreationSite())
-    sparkContext.setCallSite(startSite.get)
-    ACTIVATION_LOCK.synchronized {
-      assertNoOtherContextIsActive()
-      scheduler.start()
-      uiTab.foreach(_.attach())
-      state = StreamingContextState.ACTIVE
-      setActiveContext(this)
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/ec6f2a97/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index b8247db..4729951 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -136,9 +136,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
     addInputStream(ssc).register()
     ssc.start()
     assert(ssc.getState() === StreamingContextState.ACTIVE)
-    intercept[SparkException] {
-      ssc.start()
-    }
+    ssc.start()
     assert(ssc.getState() === StreamingContextState.ACTIVE)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org