You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/10/06 19:51:17 UTC

spark git commit: [SPARK-17780][SQL] Report Throwable to user in StreamExecution

Repository: spark
Updated Branches:
  refs/heads/master 79accf45a -> 9a48e60e6


[SPARK-17780][SQL] Report Throwable to user in StreamExecution

## What changes were proposed in this pull request?

When using an incompatible source for structured streaming, it may throw NoClassDefFoundError. It's better to just catch Throwable and report it to the user since the streaming thread is dying.

## How was this patch tested?

`test("NoClassDefFoundError from an incompatible source")`

Author: Shixiong Zhu <sh...@databricks.com>

Closes #15352 from zsxwing/SPARK-17780.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a48e60e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a48e60e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a48e60e

Branch: refs/heads/master
Commit: 9a48e60e6319d85f2c3be3a3c608dab135e18a73
Parents: 79accf4
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Thu Oct 6 12:51:12 2016 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Oct 6 12:51:12 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   |  7 ++++-
 .../spark/sql/streaming/StreamSuite.scala       | 31 +++++++++++++++++++-
 .../apache/spark/sql/streaming/StreamTest.scala |  3 +-
 3 files changed, 37 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9a48e60e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index b3a0d6a..333239f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -207,13 +207,18 @@ class StreamExecution(
       })
     } catch {
       case _: InterruptedException if state == TERMINATED => // interrupted by stop()
-      case NonFatal(e) =>
+      case e: Throwable =>
         streamDeathCause = new StreamingQueryException(
           this,
           s"Query $name terminated with exception: ${e.getMessage}",
           e,
           Some(committedOffsets.toCompositeOffset(sources)))
         logError(s"Query $name terminated with error", e)
+        // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
+        // handle them
+        if (!NonFatal(e)) {
+          throw e
+        }
     } finally {
       state = TERMINATED
       sparkSession.streams.notifyQueryTermination(StreamExecution.this)

http://git-wip-us.apache.org/repos/asf/spark/blob/9a48e60e/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 1caafb9..cdbad90 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql.streaming
 
+import scala.reflect.ClassTag
+import scala.util.control.ControlThrowable
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.sources.StreamSourceProvider
-import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.ManualClock
 
@@ -236,6 +238,33 @@ class StreamSuite extends StreamTest {
     }
   }
 
+  testQuietly("fatal errors from a source should be sent to the user") {
+    for (e <- Seq(
+      new VirtualMachineError {},
+      new ThreadDeath,
+      new LinkageError,
+      new ControlThrowable {}
+    )) {
+      val source = new Source {
+        override def getOffset: Option[Offset] = {
+          throw e
+        }
+
+        override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+          throw e
+        }
+
+        override def schema: StructType = StructType(Array(StructField("value", IntegerType)))
+
+        override def stop(): Unit = {}
+      }
+      val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source))
+      testStream(df)(
+        ExpectFailure()(ClassTag(e.getClass))
+      )
+    }
+  }
+
   test("output mode API in Scala") {
     val o1 = OutputMode.Append
     assert(o1 === InternalOutputModes.Append)

http://git-wip-us.apache.org/repos/asf/spark/blob/9a48e60e/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 09140a1..fa13d38 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -167,7 +167,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
   /** Signals that a failure is expected and should not kill the test. */
   case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
     val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
-    override def toString(): String = s"ExpectFailure[${causeClass.getCanonicalName}]"
+    override def toString(): String = s"ExpectFailure[${causeClass.getName}]"
   }
 
   /** Assert that a body is true */
@@ -322,7 +322,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
               new UncaughtExceptionHandler {
                 override def uncaughtException(t: Thread, e: Throwable): Unit = {
                   streamDeathCause = e
-                  testThread.interrupt()
                 }
               })
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org