You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/15 08:59:49 UTC
spark git commit: [SPARK-23419][SPARK-23416][SS] data source v2 write
path should re-throw interruption exceptions directly
Repository: spark
Updated Branches:
refs/heads/master 95e4b4916 -> f38c76063
[SPARK-23419][SPARK-23416][SS] data source v2 write path should re-throw interruption exceptions directly
## What changes were proposed in this pull request?
Streaming execution has a list of exceptions that means interruption, and handle them specially. `WriteToDataSourceV2Exec` should also respect this list and not wrap them with `SparkException`.
## How was this patch tested?
existing test.
Author: Wenchen Fan <we...@databricks.com>
Closes #20605 from cloud-fan/write.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f38c7606
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f38c7606
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f38c7606
Branch: refs/heads/master
Commit: f38c760638063f1fb45e9ee2c772090fb203a4a0
Parents: 95e4b49
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Feb 15 16:59:44 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Feb 15 16:59:44 2018 +0800
----------------------------------------------------------------------
.../datasources/v2/WriteToDataSourceV2.scala | 11 +++++-
.../execution/streaming/StreamExecution.scala | 40 ++++++++++----------
2 files changed, 31 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f38c7606/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index 535e796..41cdfc8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources.v2
+import scala.util.control.NonFatal
+
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.internal.Logging
@@ -27,6 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
@@ -107,7 +110,13 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e
throw new SparkException("Writing job failed.", cause)
}
logError(s"Data source writer $writer aborted.")
- throw new SparkException("Writing job aborted.", cause)
+ cause match {
+ // Do not wrap interruption exceptions that will be handled by streaming specially.
+ case _ if StreamExecution.isInterruptionException(cause) => throw cause
+ // Only wrap non fatal exceptions.
+ case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
+ case _ => throw cause
+ }
}
sparkContext.emptyRDD
http://git-wip-us.apache.org/repos/asf/spark/blob/f38c7606/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index e7982d7..3fc8c78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -356,25 +356,7 @@ abstract class StreamExecution(
private def isInterruptedByStop(e: Throwable): Boolean = {
if (state.get == TERMINATED) {
- e match {
- // InterruptedIOException - thrown when an I/O operation is interrupted
- // ClosedByInterruptException - thrown when an I/O operation upon a channel is interrupted
- case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException =>
- true
- // The cause of the following exceptions may be one of the above exceptions:
- //
- // UncheckedIOException - thrown by codes that cannot throw a checked IOException, such as
- // BiFunction.apply
- // ExecutionException - thrown by codes running in a thread pool and these codes throw an
- // exception
- // UncheckedExecutionException - thrown by codes that cannot throw a checked
- // ExecutionException, such as BiFunction.apply
- case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException)
- if e2.getCause != null =>
- isInterruptedByStop(e2.getCause)
- case _ =>
- false
- }
+ StreamExecution.isInterruptionException(e)
} else {
false
}
@@ -565,6 +547,26 @@ abstract class StreamExecution(
object StreamExecution {
val QUERY_ID_KEY = "sql.streaming.queryId"
+
+ def isInterruptionException(e: Throwable): Boolean = e match {
+ // InterruptedIOException - thrown when an I/O operation is interrupted
+ // ClosedByInterruptException - thrown when an I/O operation upon a channel is interrupted
+ case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException =>
+ true
+ // The cause of the following exceptions may be one of the above exceptions:
+ //
+ // UncheckedIOException - thrown by codes that cannot throw a checked IOException, such as
+ // BiFunction.apply
+ // ExecutionException - thrown by codes running in a thread pool and these codes throw an
+ // exception
+ // UncheckedExecutionException - thrown by codes that cannot throw a checked
+ // ExecutionException, such as BiFunction.apply
+ case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException)
+ if e2.getCause != null =>
+ isInterruptionException(e2.getCause)
+ case _ =>
+ false
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org