You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/10/29 03:14:49 UTC
spark git commit: [SPARK-18164][SQL] ForeachSink should fail the
Spark job if `process` throws exception
Repository: spark
Updated Branches:
refs/heads/master ac26e9cf2 -> 59cccbda4
[SPARK-18164][SQL] ForeachSink should fail the Spark job if `process` throws exception
## What changes were proposed in this pull request?
Fixed the issue that ForeachSink didn't rethrow the exception.
## How was this patch tested?
The fixed unit test.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #15674 from zsxwing/foreach-sink-error.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59cccbda
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59cccbda
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59cccbda
Branch: refs/heads/master
Commit: 59cccbda489f25add3e10997e950de7e88704aa7
Parents: ac26e9c
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Fri Oct 28 20:14:38 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Oct 28 20:14:38 2016 -0700
----------------------------------------------------------------------
.../sql/execution/streaming/ForeachSink.scala | 7 ++-----
.../execution/streaming/ForeachSinkSuite.scala | 19 ++++++++++++++-----
2 files changed, 16 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/59cccbda/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
index 082664a..24f98b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
@@ -68,19 +68,16 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Seria
}
datasetWithIncrementalExecution.foreachPartition { iter =>
if (writer.open(TaskContext.getPartitionId(), batchId)) {
- var isFailed = false
try {
while (iter.hasNext) {
writer.process(iter.next())
}
} catch {
case e: Throwable =>
- isFailed = true
writer.close(e)
+ throw e
}
- if (!isFailed) {
- writer.close(null)
- }
+ writer.close(null)
} else {
writer.close(null)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/59cccbda/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
index 7928b8e..9e05921 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
@@ -23,8 +23,9 @@ import scala.collection.mutable
import org.scalatest.BeforeAndAfter
+import org.apache.spark.SparkException
import org.apache.spark.sql.ForeachWriter
-import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest}
import org.apache.spark.sql.test.SharedSQLContext
class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
@@ -136,7 +137,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
}
}
- test("foreach with error") {
+ testQuietly("foreach with error") {
withTempDir { checkpointDir =>
val input = MemoryStream[Int]
val query = input.toDS().repartition(1).writeStream
@@ -148,16 +149,24 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
}
}).start()
input.addData(1, 2, 3, 4)
- query.processAllAvailable()
+
+ // Error in `process` should fail the Spark job
+ val e = intercept[StreamingQueryException] {
+ query.processAllAvailable()
+ }
+ assert(e.getCause.isInstanceOf[SparkException])
+ assert(e.getCause.getCause.getMessage === "error")
+ assert(query.isActive === false)
val allEvents = ForeachSinkSuite.allEvents()
assert(allEvents.size === 1)
assert(allEvents(0)(0) === ForeachSinkSuite.Open(partition = 0, version = 0))
- assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1))
+ assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1))
+
+ // `close` should be called with the error
val errorEvent = allEvents(0)(2).asInstanceOf[ForeachSinkSuite.Close]
assert(errorEvent.error.get.isInstanceOf[RuntimeException])
assert(errorEvent.error.get.getMessage === "error")
- query.stop()
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org