You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/21 09:31:09 UTC

spark git commit: [SPARK-7745] Change asserts to requires for user input checks in Spark Streaming

Repository: spark
Updated Branches:
  refs/heads/master 947ea1cf5 -> 1ee8eb431


[SPARK-7745] Change asserts to requires for user input checks in Spark Streaming

Assertions can be turned off. `require` throws an `IllegalArgumentException` which makes more sense when it's a user set variable.

Author: Burak Yavuz <br...@gmail.com>

Closes #6271 from brkyvz/streaming-require and squashes the following commits:

d249484 [Burak Yavuz] fix merge conflict
264adb8 [Burak Yavuz] addressed comments v1.0
6161350 [Burak Yavuz] fix tests
16aa766 [Burak Yavuz] changed more assertions to more meaningful errors
afd923d [Burak Yavuz] changed some assertions to require


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ee8eb43
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ee8eb43
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ee8eb43

Branch: refs/heads/master
Commit: 1ee8eb431e04db16f95f0bcb3a546ad6e14b616f
Parents: 947ea1c
Author: Burak Yavuz <br...@gmail.com>
Authored: Thu May 21 00:30:55 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu May 21 00:30:55 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/streaming/DStreamGraph.scala   |  4 +-
 .../spark/streaming/StreamingContext.scala      | 11 ++---
 .../streaming/api/python/PythonDStream.scala    |  4 +-
 .../spark/streaming/dstream/DStream.scala       | 45 ++++++++++----------
 .../dstream/ReducedWindowedDStream.scala        |  4 +-
 .../scheduler/ReceivedBlockTracker.scala        |  2 +-
 .../spark/streaming/StreamingContextSuite.scala |  6 +--
 7 files changed, 38 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1ee8eb43/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 85b354f..40789c6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -157,10 +157,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
 
   def validate() {
     this.synchronized {
-      assert(batchDuration != null, "Batch duration has not been set")
+      require(batchDuration != null, "Batch duration has not been set")
       // assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
       // " is very low")
-      assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute")
+      require(getOutputStreams().size > 0, "No output operations registered, so nothing to execute")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1ee8eb43/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 9506369..160fc42 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -156,7 +156,7 @@ class StreamingContext private[streaming] (
       cp_.graph.restoreCheckpointData()
       cp_.graph
     } else {
-      assert(batchDur_ != null, "Batch duration for streaming context cannot be null")
+      require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
       val newGraph = new DStreamGraph()
       newGraph.setBatchDuration(batchDur_)
       newGraph
@@ -462,7 +462,8 @@ class StreamingContext private[streaming] (
       directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf)
     val data = br.map { case (k, v) =>
       val bytes = v.getBytes
-      assert(bytes.length == recordLength, "Byte array does not have correct length")
+      require(bytes.length == recordLength, "Byte array does not have correct length. " +
+        s"${bytes.length} did not equal recordLength: $recordLength")
       bytes
     }
     data
@@ -568,7 +569,7 @@ class StreamingContext private[streaming] (
   /**
    * Start the execution of the streams.
    *
-   * @throws SparkException if the StreamingContext is already stopped.
+   * @throws IllegalStateException if the StreamingContext is already stopped.
    */
   def start(): Unit = synchronized {
     state match {
@@ -587,7 +588,7 @@ class StreamingContext private[streaming] (
       case ACTIVE =>
         logWarning("StreamingContext has already been started")
       case STOPPED =>
-        throw new SparkException("StreamingContext has already been stopped")
+        throw new IllegalStateException("StreamingContext has already been stopped")
     }
   }
 
@@ -689,7 +690,7 @@ object StreamingContext extends Logging {
   private def assertNoOtherContextIsActive(): Unit = {
     ACTIVATION_LOCK.synchronized {
       if (activeContext.get() != null) {
-        throw new SparkException(
+        throw new IllegalStateException(
           "Only one StreamingContext may be started in this JVM. " +
             "Currently running StreamingContext was started at" +
             activeContext.get.startSite.get.longForm)

http://git-wip-us.apache.org/repos/asf/spark/blob/1ee8eb43/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index 4c28654..d064012 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -109,7 +109,7 @@ private[python] object PythonTransformFunctionSerializer {
   }
 
   def serialize(func: PythonTransformFunction): Array[Byte] = {
-    assert(serializer != null, "Serializer has not been registered!")
+    require(serializer != null, "Serializer has not been registered!")
     // get the id of PythonTransformFunction in py4j
     val h = Proxy.getInvocationHandler(func.asInstanceOf[Proxy])
     val f = h.getClass().getDeclaredField("id")
@@ -119,7 +119,7 @@ private[python] object PythonTransformFunctionSerializer {
   }
 
   def deserialize(bytes: Array[Byte]): PythonTransformFunction = {
-    assert(serializer != null, "Serializer has not been registered!")
+    require(serializer != null, "Serializer has not been registered!")
     serializer.loads(bytes)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1ee8eb43/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 7c50a76..c858647 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -217,53 +217,52 @@ abstract class DStream[T: ClassTag] (
       case StreamingContextState.INITIALIZED =>
         // good to go
       case StreamingContextState.ACTIVE =>
-        throw new SparkException(
+        throw new IllegalStateException(
           "Adding new inputs, transformations, and output operations after " +
             "starting a context is not supported")
       case StreamingContextState.STOPPED =>
-        throw new SparkException(
+        throw new IllegalStateException(
           "Adding new inputs, transformations, and output operations after " +
             "stopping a context is not supported")
     }
   }
 
   private[streaming] def validateAtStart() {
-    assert(rememberDuration != null, "Remember duration is set to null")
+    require(rememberDuration != null, "Remember duration is set to null")
 
-    assert(
+    require(
       !mustCheckpoint || checkpointDuration != null,
       "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." +
         " Please use DStream.checkpoint() to set the interval."
     )
 
-    assert(
+    require(
      checkpointDuration == null || context.sparkContext.checkpointDir.isDefined,
-      "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" +
-      " or SparkContext.checkpoint() to set the checkpoint directory."
+      "The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()."
     )
 
-    assert(
+    require(
       checkpointDuration == null || checkpointDuration >= slideDuration,
       "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
         checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
         "Please set it to at least " + slideDuration + "."
     )
 
-    assert(
+    require(
       checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
       "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
         checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " +
-        "Please set it to a multiple " + slideDuration + "."
+        "Please set it to a multiple of " + slideDuration + "."
     )
 
-    assert(
+    require(
       checkpointDuration == null || storageLevel != StorageLevel.NONE,
       "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " +
         "level has not been set to enable persisting. Please use DStream.persist() to set the " +
         "storage level to use memory for better checkpointing performance."
     )
 
-    assert(
+    require(
       checkpointDuration == null || rememberDuration > checkpointDuration,
       "The remember duration for " + this.getClass.getSimpleName + " has been set to " +
         rememberDuration + " which is not more than the checkpoint interval (" +
@@ -272,7 +271,7 @@ abstract class DStream[T: ClassTag] (
 
     val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf)
     logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
-    assert(
+    require(
       metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
       "It seems you are doing some DStream window operation or setting a checkpoint interval " +
         "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
@@ -633,8 +632,8 @@ abstract class DStream[T: ClassTag] (
    * 'this' DStream will be registered as an output stream and therefore materialized.
    */
   def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
-    // because the DStream is reachable from the outer object here, and because 
-    // DStreams can't be serialized with closures, we can't proactively check 
+    // because the DStream is reachable from the outer object here, and because
+    // DStreams can't be serialized with closures, we can't proactively check
     // it for serializability and so we pass the optional false to SparkContext.clean
     new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
   }
@@ -644,8 +643,8 @@ abstract class DStream[T: ClassTag] (
    * on each RDD of 'this' DStream.
    */
   def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
-    // because the DStream is reachable from the outer object here, and because 
-    // DStreams can't be serialized with closures, we can't proactively check 
+    // because the DStream is reachable from the outer object here, and because
+    // DStreams can't be serialized with closures, we can't proactively check
     // it for serializability and so we pass the optional false to SparkContext.clean
     val cleanedF = context.sparkContext.clean(transformFunc, false)
     transform((r: RDD[T], t: Time) => cleanedF(r))
@@ -656,8 +655,8 @@ abstract class DStream[T: ClassTag] (
    * on each RDD of 'this' DStream.
    */
   def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope {
-    // because the DStream is reachable from the outer object here, and because 
-    // DStreams can't be serialized with closures, we can't proactively check 
+    // because the DStream is reachable from the outer object here, and because
+    // DStreams can't be serialized with closures, we can't proactively check
     // it for serializability and so we pass the optional false to SparkContext.clean
     val cleanedF = context.sparkContext.clean(transformFunc, false)
     val realTransformFunc =  (rdds: Seq[RDD[_]], time: Time) => {
@@ -674,8 +673,8 @@ abstract class DStream[T: ClassTag] (
   def transformWith[U: ClassTag, V: ClassTag](
       other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
     ): DStream[V] = ssc.withScope {
-    // because the DStream is reachable from the outer object here, and because 
-    // DStreams can't be serialized with closures, we can't proactively check 
+    // because the DStream is reachable from the outer object here, and because
+    // DStreams can't be serialized with closures, we can't proactively check
     // it for serializability and so we pass the optional false to SparkContext.clean
     val cleanedF = ssc.sparkContext.clean(transformFunc, false)
     transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
@@ -688,8 +687,8 @@ abstract class DStream[T: ClassTag] (
   def transformWith[U: ClassTag, V: ClassTag](
       other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
     ): DStream[V] = ssc.withScope {
-    // because the DStream is reachable from the outer object here, and because 
-    // DStreams can't be serialized with closures, we can't proactively check 
+    // because the DStream is reachable from the outer object here, and because
+    // DStreams can't be serialized with closures, we can't proactively check
     // it for serializability and so we pass the optional false to SparkContext.clean
     val cleanedF = ssc.sparkContext.clean(transformFunc, false)
     val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {

http://git-wip-us.apache.org/repos/asf/spark/blob/1ee8eb43/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index 1385ccb..df9f7f1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -40,12 +40,12 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
     partitioner: Partitioner
   ) extends DStream[(K,V)](parent.ssc) {
 
-  assert(_windowDuration.isMultipleOf(parent.slideDuration),
+  require(_windowDuration.isMultipleOf(parent.slideDuration),
     "The window duration of ReducedWindowedDStream (" + _windowDuration + ") " +
       "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
   )
 
-  assert(_slideDuration.isMultipleOf(parent.slideDuration),
+  require(_slideDuration.isMultipleOf(parent.slideDuration),
     "The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " +
       "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
   )

http://git-wip-us.apache.org/repos/asf/spark/blob/1ee8eb43/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index a9f4147..7720259 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -153,7 +153,7 @@ private[streaming] class ReceivedBlockTracker(
    * returns only after the files are cleaned up.
    */
   def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
-    assert(cleanupThreshTime.milliseconds < clock.getTimeMillis())
+    require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
     val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
     logInfo("Deleting batches " + timesToCleanup)
     writeToLog(BatchCleanupEvent(timesToCleanup))

http://git-wip-us.apache.org/repos/asf/spark/blob/1ee8eb43/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 3a958bf..f8e8030 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -182,7 +182,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
     ssc = new StreamingContext(master, appName, batchDuration)
     addInputStream(ssc).register()
     ssc.stop()
-    intercept[SparkException] {
+    intercept[IllegalStateException] {
       ssc.start() // start after stop should throw exception
     }
     assert(ssc.getState() === StreamingContextState.STOPPED)
@@ -600,7 +600,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
     val anotherInput = addInputStream(anotherSsc)
     anotherInput.foreachRDD { rdd => rdd.count }
 
-    val exception = intercept[SparkException] {
+    val exception = intercept[IllegalStateException] {
       anotherSsc.start()
     }
     assert(exception.getMessage.contains("StreamingContext"), "Did not get the right exception")
@@ -623,7 +623,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
 
     def testForException(clue: String, expectedErrorMsg: String)(body: => Unit): Unit = {
       withClue(clue) {
-        val ex = intercept[SparkException] {
+        val ex = intercept[IllegalStateException] {
           body
         }
         assert(ex.getMessage.toLowerCase().contains(expectedErrorMsg))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org