You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/01/18 18:52:49 UTC

spark git commit: [SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon error

Repository: spark
Updated Branches:
  refs/heads/master c050c1227 -> 569e50680


[SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon error

## What changes were proposed in this pull request?

We should call `StateStore.abort()` when there should be any error before the store is committed.

## How was this patch tested?

Manually.

Author: Liwei Lin <lw...@gmail.com>

Closes #16547 from lw-lin/append-filter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/569e5068
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/569e5068
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/569e5068

Branch: refs/heads/master
Commit: 569e50680f97b1ed054337a39fe198769ef52d93
Parents: c050c12
Author: Liwei Lin <lw...@gmail.com>
Authored: Wed Jan 18 10:52:47 2017 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Jan 18 10:52:47 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/streaming/StatefulAggregate.scala    | 8 ++++++++
 .../streaming/state/HDFSBackedStateStoreProvider.scala       | 2 +-
 .../spark/sql/execution/streaming/state/StateStore.scala     | 2 +-
 3 files changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index 0551e4b..d4ccced 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.state._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.TaskContext
 
 
 /** Used to identify the state store for a given operator. */
@@ -150,6 +151,13 @@ case class StateStoreSaveExec(
         val numTotalStateRows = longMetric("numTotalStateRows")
         val numUpdatedStateRows = longMetric("numUpdatedStateRows")
 
+        // Abort the state store in case of error
+        TaskContext.get().addTaskCompletionListener(_ => {
+          if (!store.hasCommitted) {
+            store.abort()
+          }
+        })
+
         outputMode match {
           // Update and output all rows in the StateStore.
           case Some(Complete) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 4f3f818..1279b71 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -203,7 +203,7 @@ private[state] class HDFSBackedStateStoreProvider(
     /**
      * Whether all updates have been committed
      */
-    override private[state] def hasCommitted: Boolean = {
+    override private[streaming] def hasCommitted: Boolean = {
       state == COMMITTED
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 9bc6c0e..d59746f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -83,7 +83,7 @@ trait StateStore {
   /**
    * Whether all updates have been committed
    */
-  private[state] def hasCommitted: Boolean
+  private[streaming] def hasCommitted: Boolean
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org