You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/15 08:59:49 UTC

spark git commit: [SPARK-23419][SPARK-23416][SS] data source v2 write path should re-throw interruption exceptions directly

Repository: spark
Updated Branches:
  refs/heads/master 95e4b4916 -> f38c76063


[SPARK-23419][SPARK-23416][SS] data source v2 write path should re-throw interruption exceptions directly

## What changes were proposed in this pull request?

Streaming execution has a list of exceptions that means interruption, and handle them specially. `WriteToDataSourceV2Exec` should also respect this list and not wrap them with `SparkException`.

## How was this patch tested?

existing test.

Author: Wenchen Fan <we...@databricks.com>

Closes #20605 from cloud-fan/write.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f38c7606
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f38c7606
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f38c7606

Branch: refs/heads/master
Commit: f38c760638063f1fb45e9ee2c772090fb203a4a0
Parents: 95e4b49
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Feb 15 16:59:44 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Feb 15 16:59:44 2018 +0800

----------------------------------------------------------------------
 .../datasources/v2/WriteToDataSourceV2.scala    | 11 +++++-
 .../execution/streaming/StreamExecution.scala   | 40 ++++++++++----------
 2 files changed, 31 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f38c7606/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index 535e796..41cdfc8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import scala.util.control.NonFatal
+
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.executor.CommitDeniedException
 import org.apache.spark.internal.Logging
@@ -27,6 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.streaming.StreamExecution
 import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
@@ -107,7 +110,13 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e
             throw new SparkException("Writing job failed.", cause)
         }
         logError(s"Data source writer $writer aborted.")
-        throw new SparkException("Writing job aborted.", cause)
+        cause match {
+          // Do not wrap interruption exceptions that will be handled by streaming specially.
+          case _ if StreamExecution.isInterruptionException(cause) => throw cause
+          // Only wrap non fatal exceptions.
+          case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
+          case _ => throw cause
+        }
     }
 
     sparkContext.emptyRDD

http://git-wip-us.apache.org/repos/asf/spark/blob/f38c7606/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index e7982d7..3fc8c78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -356,25 +356,7 @@ abstract class StreamExecution(
 
   private def isInterruptedByStop(e: Throwable): Boolean = {
     if (state.get == TERMINATED) {
-      e match {
-        // InterruptedIOException - thrown when an I/O operation is interrupted
-        // ClosedByInterruptException - thrown when an I/O operation upon a channel is interrupted
-        case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException =>
-          true
-        // The cause of the following exceptions may be one of the above exceptions:
-        //
-        // UncheckedIOException - thrown by codes that cannot throw a checked IOException, such as
-        //                        BiFunction.apply
-        // ExecutionException - thrown by codes running in a thread pool and these codes throw an
-        //                      exception
-        // UncheckedExecutionException - thrown by codes that cannot throw a checked
-        //                               ExecutionException, such as BiFunction.apply
-        case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException)
-          if e2.getCause != null =>
-          isInterruptedByStop(e2.getCause)
-        case _ =>
-          false
-      }
+      StreamExecution.isInterruptionException(e)
     } else {
       false
     }
@@ -565,6 +547,26 @@ abstract class StreamExecution(
 
 object StreamExecution {
   val QUERY_ID_KEY = "sql.streaming.queryId"
+
+  def isInterruptionException(e: Throwable): Boolean = e match {
+    // InterruptedIOException - thrown when an I/O operation is interrupted
+    // ClosedByInterruptException - thrown when an I/O operation upon a channel is interrupted
+    case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException =>
+      true
+    // The cause of the following exceptions may be one of the above exceptions:
+    //
+    // UncheckedIOException - thrown by codes that cannot throw a checked IOException, such as
+    //                        BiFunction.apply
+    // ExecutionException - thrown by codes running in a thread pool and these codes throw an
+    //                      exception
+    // UncheckedExecutionException - thrown by codes that cannot throw a checked
+    //                               ExecutionException, such as BiFunction.apply
+    case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException)
+        if e2.getCause != null =>
+      isInterruptionException(e2.getCause)
+    case _ =>
+      false
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org