You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/12/05 19:36:16 UTC

spark git commit: [SPARK-18694][SS] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException

Repository: spark
Updated Branches:
  refs/heads/master 410b78986 -> 246012859


[SPARK-18694][SS] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException

## What changes were proposed in this pull request?

- Add StreamingQuery.explain and exception to Python.
- Fix StreamingQueryException to not expose `OffsetSeq`.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <sh...@databricks.com>

Closes #16125 from zsxwing/py-streaming-explain.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24601285
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24601285
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24601285

Branch: refs/heads/master
Commit: 246012859f0ed5248809a2e00e8355fbdaa8beb5
Parents: 410b789
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Mon Dec 5 11:36:11 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Dec 5 11:36:11 2016 -0800

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |  9 ++++-
 python/pyspark/sql/streaming.py                 | 40 +++++++++++++++++++
 python/pyspark/sql/tests.py                     | 29 ++++++++++++++
 .../execution/streaming/StreamExecution.scala   |  5 ++-
 .../sql/streaming/StreamingQueryException.scala | 42 ++++++++++++--------
 .../apache/spark/sql/streaming/progress.scala   |  7 ++++
 .../apache/spark/sql/streaming/StreamTest.scala |  2 -
 .../sql/streaming/StreamingQuerySuite.scala     | 10 +++--
 8 files changed, 119 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/24601285/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 7fed8cb..f3e5a21 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -106,7 +106,14 @@ object MimaExcludes {
       ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="),
 
       // [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer
-      ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables")
+      ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables"),
+
+      // [SPARK-18694] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
+      ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException$"),
+      ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),
+      ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"),
+      ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query")
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/24601285/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 84f01d3..4a7d17b 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -30,6 +30,7 @@ from pyspark import since, keyword_only
 from pyspark.rdd import ignore_unicode_prefix
 from pyspark.sql.readwriter import OptionUtils, to_str
 from pyspark.sql.types import *
+from pyspark.sql.utils import StreamingQueryException
 
 __all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"]
 
@@ -132,6 +133,45 @@ class StreamingQuery(object):
         """
         self._jsq.stop()
 
+    @since(2.1)
+    def explain(self, extended=False):
+        """Prints the (logical and physical) plans to the console for debugging purpose.
+
+        :param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
+
+        >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()
+        >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.
+        >>> sq.explain()
+        == Physical Plan ==
+        ...
+        >>> sq.explain(True)
+        == Parsed Logical Plan ==
+        ...
+        == Analyzed Logical Plan ==
+        ...
+        == Optimized Logical Plan ==
+        ...
+        == Physical Plan ==
+        ...
+        >>> sq.stop()
+        """
+        # Cannot call `_jsq.explain(...)` because it will print in the JVM process.
+        # We should print it in the Python process.
+        print(self._jsq.explainInternal(extended))
+
+    @since(2.1)
+    def exception(self):
+        """
+        :return: the StreamingQueryException if the query was terminated by an exception, or None.
+        """
+        if self._jsq.exception().isDefined():
+            je = self._jsq.exception().get()
+            msg = je.toString().split(': ', 1)[1]  # Drop the Java StreamingQueryException type info
+            stackTrace = '\n\t at '.join(map(lambda x: x.toString(), je.getStackTrace()))
+            return StreamingQueryException(msg, stackTrace)
+        else:
+            return None
+
 
 class StreamingQueryManager(object):
     """A class to manage all the :class:`StreamingQuery` StreamingQueries active.

http://git-wip-us.apache.org/repos/asf/spark/blob/24601285/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 0aff9ce..9f34414 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1137,6 +1137,35 @@ class SQLTests(ReusedPySparkTestCase):
             q.stop()
             shutil.rmtree(tmpPath)
 
+    def test_stream_exception(self):
+        sdf = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
+        sq = sdf.writeStream.format('memory').queryName('query_explain').start()
+        try:
+            sq.processAllAvailable()
+            self.assertEqual(sq.exception(), None)
+        finally:
+            sq.stop()
+
+        from pyspark.sql.functions import col, udf
+        from pyspark.sql.utils import StreamingQueryException
+        bad_udf = udf(lambda x: 1 / 0)
+        sq = sdf.select(bad_udf(col("value")))\
+            .writeStream\
+            .format('memory')\
+            .queryName('this_query')\
+            .start()
+        try:
+            # Process some data to fail the query
+            sq.processAllAvailable()
+            self.fail("bad udf should fail the query")
+        except StreamingQueryException as e:
+            # This is expected
+            self.assertTrue("ZeroDivisionError" in e.desc)
+        finally:
+            sq.stop()
+        self.assertTrue(type(sq.exception()) is StreamingQueryException)
+        self.assertTrue("ZeroDivisionError" in sq.exception().desc)
+
     def test_query_manager_await_termination(self):
         df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
         for q in self.spark._wrapped.streams.active:

http://git-wip-us.apache.org/repos/asf/spark/blob/24601285/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8804c64..6b1c01a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -93,7 +93,7 @@ class StreamExecution(
    * once, since the field's value may change at any time.
    */
   @volatile
-  protected var availableOffsets = new StreamProgress
+  var availableOffsets = new StreamProgress
 
   /** The current batchId or -1 if execution has not yet been initialized. */
   protected var currentBatchId: Long = -1
@@ -263,7 +263,8 @@ class StreamExecution(
           this,
           s"Query $name terminated with exception: ${e.getMessage}",
           e,
-          Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)))
+          committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString,
+          availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString)
         logError(s"Query $name terminated with error", e)
         updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
         // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to

http://git-wip-us.apache.org/repos/asf/spark/blob/24601285/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 13f11ba..a96150a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -24,32 +24,42 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut
  * :: Experimental ::
  * Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
  * that caused the failure.
- * @param query       Query that caused the exception
  * @param message     Message of this exception
  * @param cause       Internal cause of this exception
- * @param startOffset Starting offset (if known) of the range of data in which exception occurred
- * @param endOffset   Ending offset (if known) of the range of data in exception occurred
+ * @param startOffset Starting offset in json of the range of data in which exception occurred
+ * @param endOffset   Ending offset in json of the range of data in exception occurred
  * @since 2.0.0
  */
 @Experimental
-class StreamingQueryException private[sql](
-    @transient val query: StreamingQuery,
+class StreamingQueryException private(
+    causeString: String,
     val message: String,
     val cause: Throwable,
-    val startOffset: Option[OffsetSeq] = None,
-    val endOffset: Option[OffsetSeq] = None)
+    val startOffset: String,
+    val endOffset: String)
   extends Exception(message, cause) {
 
+  private[sql] def this(
+      query: StreamingQuery,
+      message: String,
+      cause: Throwable,
+      startOffset: String,
+      endOffset: String) {
+    this(
+      // scalastyle:off
+      s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}
+         |
+         |${query.asInstanceOf[StreamExecution].toDebugString}
+         """.stripMargin,
+      // scalastyle:on
+      message,
+      cause,
+      startOffset,
+      endOffset)
+  }
+
   /** Time when the exception occurred */
   val time: Long = System.currentTimeMillis
 
-  override def toString(): String = {
-    val causeStr =
-      s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}"
-    s"""
-       |$causeStr
-       |
-       |${query.asInstanceOf[StreamExecution].toDebugString}
-       """.stripMargin
-  }
+  override def toString(): String = causeString
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/24601285/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 4c82474..fb5bad0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -38,6 +38,13 @@ import org.apache.spark.annotation.Experimental
 class StateOperatorProgress private[sql](
     val numRowsTotal: Long,
     val numRowsUpdated: Long) {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
   private[sql] def jsonValue: JValue = {
     ("numRowsTotal" -> JInt(numRowsTotal)) ~
     ("numRowsUpdated" -> JInt(numRowsUpdated))

http://git-wip-us.apache.org/repos/asf/spark/blob/24601285/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index a2629f7..4332265 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -412,8 +412,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
               eventually("microbatch thread not stopped after termination with failure") {
                 assert(!currentStream.microBatchThread.isAlive)
               }
-              verify(thrownException.query.eq(currentStream),
-                s"incorrect query reference in exception")
               verify(currentStream.exception === Some(thrownException),
                 s"incorrect exception returned by query.exception()")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/24601285/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 56abe12..f7fc194 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -103,10 +103,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
       TestAwaitTermination(ExpectException[SparkException]),
       TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000),
       TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
-      AssertOnQuery(
-        q => q.exception.get.startOffset.get.offsets ===
-          q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").offsets,
-        "incorrect start offset on exception")
+      AssertOnQuery(q => {
+        q.exception.get.startOffset ===
+          q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString &&
+          q.exception.get.endOffset ===
+            q.availableOffsets.toOffsetSeq(Seq(inputData), "{}").toString
+      }, "incorrect start offset or end offset on exception")
     )
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org