You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/05 22:59:45 UTC
spark git commit: [SPARK-18694][SS] Add StreamingQuery.explain and
exception to Python and fix StreamingQueryException (branch 2.1)
Repository: spark
Updated Branches:
refs/heads/branch-2.1 39759ff00 -> c6a4e3d96
[SPARK-18694][SS] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException (branch 2.1)
## What changes were proposed in this pull request?
Backport #16125 to branch 2.1.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <sh...@databricks.com>
Closes #16153 from zsxwing/SPARK-18694-2.1.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6a4e3d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6a4e3d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6a4e3d9
Branch: refs/heads/branch-2.1
Commit: c6a4e3d96997bf166360524a95510b3490b68b49
Parents: 39759ff
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Mon Dec 5 14:59:42 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Mon Dec 5 14:59:42 2016 -0800
----------------------------------------------------------------------
project/MimaExcludes.scala | 9 ++++-
python/pyspark/sql/streaming.py | 40 +++++++++++++++++++
python/pyspark/sql/tests.py | 29 ++++++++++++++
.../execution/streaming/StreamExecution.scala | 5 ++-
.../sql/streaming/StreamingQueryException.scala | 42 ++++++++++++--------
.../apache/spark/sql/streaming/progress.scala | 7 ++++
.../apache/spark/sql/streaming/StreamTest.scala | 2 -
.../sql/streaming/StreamingQuerySuite.scala | 10 +++--
8 files changed, 119 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c6a4e3d9/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 9739164..9e63254 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -97,7 +97,14 @@ object MimaExcludes {
// [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness.
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.aggregationDepth"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.getAggregationDepth"),
- ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_=")
+ ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="),
+
+ // [SPARK-18694] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException$"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query")
)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c6a4e3d9/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 84f01d3..4a7d17b 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -30,6 +30,7 @@ from pyspark import since, keyword_only
from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql.readwriter import OptionUtils, to_str
from pyspark.sql.types import *
+from pyspark.sql.utils import StreamingQueryException
__all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"]
@@ -132,6 +133,45 @@ class StreamingQuery(object):
"""
self._jsq.stop()
+ @since(2.1)
+ def explain(self, extended=False):
+ """Prints the (logical and physical) plans to the console for debugging purpose.
+
+ :param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
+
+ >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()
+ >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.
+ >>> sq.explain()
+ == Physical Plan ==
+ ...
+ >>> sq.explain(True)
+ == Parsed Logical Plan ==
+ ...
+ == Analyzed Logical Plan ==
+ ...
+ == Optimized Logical Plan ==
+ ...
+ == Physical Plan ==
+ ...
+ >>> sq.stop()
+ """
+ # Cannot call `_jsq.explain(...)` because it will print in the JVM process.
+ # We should print it in the Python process.
+ print(self._jsq.explainInternal(extended))
+
+ @since(2.1)
+ def exception(self):
+ """
+ :return: the StreamingQueryException if the query was terminated by an exception, or None.
+ """
+ if self._jsq.exception().isDefined():
+ je = self._jsq.exception().get()
+ msg = je.toString().split(': ', 1)[1] # Drop the Java StreamingQueryException type info
+ stackTrace = '\n\t at '.join(map(lambda x: x.toString(), je.getStackTrace()))
+ return StreamingQueryException(msg, stackTrace)
+ else:
+ return None
+
class StreamingQueryManager(object):
"""A class to manage all the :class:`StreamingQuery` StreamingQueries active.
http://git-wip-us.apache.org/repos/asf/spark/blob/c6a4e3d9/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 0aff9ce..9f34414 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1137,6 +1137,35 @@ class SQLTests(ReusedPySparkTestCase):
q.stop()
shutil.rmtree(tmpPath)
+ def test_stream_exception(self):
+ sdf = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
+ sq = sdf.writeStream.format('memory').queryName('query_explain').start()
+ try:
+ sq.processAllAvailable()
+ self.assertEqual(sq.exception(), None)
+ finally:
+ sq.stop()
+
+ from pyspark.sql.functions import col, udf
+ from pyspark.sql.utils import StreamingQueryException
+ bad_udf = udf(lambda x: 1 / 0)
+ sq = sdf.select(bad_udf(col("value")))\
+ .writeStream\
+ .format('memory')\
+ .queryName('this_query')\
+ .start()
+ try:
+ # Process some data to fail the query
+ sq.processAllAvailable()
+ self.fail("bad udf should fail the query")
+ except StreamingQueryException as e:
+ # This is expected
+ self.assertTrue("ZeroDivisionError" in e.desc)
+ finally:
+ sq.stop()
+ self.assertTrue(type(sq.exception()) is StreamingQueryException)
+ self.assertTrue("ZeroDivisionError" in sq.exception().desc)
+
def test_query_manager_await_termination(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for q in self.spark._wrapped.streams.active:
http://git-wip-us.apache.org/repos/asf/spark/blob/c6a4e3d9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8804c64..6b1c01a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -93,7 +93,7 @@ class StreamExecution(
* once, since the field's value may change at any time.
*/
@volatile
- protected var availableOffsets = new StreamProgress
+ var availableOffsets = new StreamProgress
/** The current batchId or -1 if execution has not yet been initialized. */
protected var currentBatchId: Long = -1
@@ -263,7 +263,8 @@ class StreamExecution(
this,
s"Query $name terminated with exception: ${e.getMessage}",
e,
- Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)))
+ committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString,
+ availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString)
logError(s"Query $name terminated with error", e)
updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
http://git-wip-us.apache.org/repos/asf/spark/blob/c6a4e3d9/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 13f11ba..a96150a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -24,32 +24,42 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut
* :: Experimental ::
* Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
* that caused the failure.
- * @param query Query that caused the exception
* @param message Message of this exception
* @param cause Internal cause of this exception
- * @param startOffset Starting offset (if known) of the range of data in which exception occurred
- * @param endOffset Ending offset (if known) of the range of data in exception occurred
+ * @param startOffset Starting offset in json of the range of data in which exception occurred
+ * @param endOffset Ending offset in json of the range of data in exception occurred
* @since 2.0.0
*/
@Experimental
-class StreamingQueryException private[sql](
- @transient val query: StreamingQuery,
+class StreamingQueryException private(
+ causeString: String,
val message: String,
val cause: Throwable,
- val startOffset: Option[OffsetSeq] = None,
- val endOffset: Option[OffsetSeq] = None)
+ val startOffset: String,
+ val endOffset: String)
extends Exception(message, cause) {
+ private[sql] def this(
+ query: StreamingQuery,
+ message: String,
+ cause: Throwable,
+ startOffset: String,
+ endOffset: String) {
+ this(
+ // scalastyle:off
+ s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}
+ |
+ |${query.asInstanceOf[StreamExecution].toDebugString}
+ """.stripMargin,
+ // scalastyle:on
+ message,
+ cause,
+ startOffset,
+ endOffset)
+ }
+
/** Time when the exception occurred */
val time: Long = System.currentTimeMillis
- override def toString(): String = {
- val causeStr =
- s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}"
- s"""
- |$causeStr
- |
- |${query.asInstanceOf[StreamExecution].toDebugString}
- """.stripMargin
- }
+ override def toString(): String = causeString
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c6a4e3d9/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 4c82474..fb5bad0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -38,6 +38,13 @@ import org.apache.spark.annotation.Experimental
class StateOperatorProgress private[sql](
val numRowsTotal: Long,
val numRowsUpdated: Long) {
+
+ /** The compact JSON representation of this progress. */
+ def json: String = compact(render(jsonValue))
+
+ /** The pretty (i.e. indented) JSON representation of this progress. */
+ def prettyJson: String = pretty(render(jsonValue))
+
private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated))
http://git-wip-us.apache.org/repos/asf/spark/blob/c6a4e3d9/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index a2629f7..4332265 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -412,8 +412,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
eventually("microbatch thread not stopped after termination with failure") {
assert(!currentStream.microBatchThread.isAlive)
}
- verify(thrownException.query.eq(currentStream),
- s"incorrect query reference in exception")
verify(currentStream.exception === Some(thrownException),
s"incorrect exception returned by query.exception()")
http://git-wip-us.apache.org/repos/asf/spark/blob/c6a4e3d9/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 56abe12..f7fc194 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -103,10 +103,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
TestAwaitTermination(ExpectException[SparkException]),
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000),
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
- AssertOnQuery(
- q => q.exception.get.startOffset.get.offsets ===
- q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").offsets,
- "incorrect start offset on exception")
+ AssertOnQuery(q => {
+ q.exception.get.startOffset ===
+ q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString &&
+ q.exception.get.endOffset ===
+ q.availableOffsets.toOffsetSeq(Seq(inputData), "{}").toString
+ }, "incorrect start offset or end offset on exception")
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org