You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/01/18 18:52:49 UTC
spark git commit: [SPARK-19168][STRUCTURED STREAMING] StateStore
should be aborted upon error
Repository: spark
Updated Branches:
refs/heads/master c050c1227 -> 569e50680
[SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon error
## What changes were proposed in this pull request?
We should call `StateStore.abort()` when there should be any error before the store is committed.
## How was this patch tested?
Manually.
Author: Liwei Lin <lw...@gmail.com>
Closes #16547 from lw-lin/append-filter.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/569e5068
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/569e5068
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/569e5068
Branch: refs/heads/master
Commit: 569e50680f97b1ed054337a39fe198769ef52d93
Parents: c050c12
Author: Liwei Lin <lw...@gmail.com>
Authored: Wed Jan 18 10:52:47 2017 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Jan 18 10:52:47 2017 -0800
----------------------------------------------------------------------
.../spark/sql/execution/streaming/StatefulAggregate.scala | 8 ++++++++
.../streaming/state/HDFSBackedStateStoreProvider.scala | 2 +-
.../spark/sql/execution/streaming/state/StateStore.scala | 2 +-
3 files changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index 0551e4b..d4ccced 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
+import org.apache.spark.TaskContext
/** Used to identify the state store for a given operator. */
@@ -150,6 +151,13 @@ case class StateStoreSaveExec(
val numTotalStateRows = longMetric("numTotalStateRows")
val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+ // Abort the state store in case of error
+ TaskContext.get().addTaskCompletionListener(_ => {
+ if (!store.hasCommitted) {
+ store.abort()
+ }
+ })
+
outputMode match {
// Update and output all rows in the StateStore.
case Some(Complete) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 4f3f818..1279b71 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -203,7 +203,7 @@ private[state] class HDFSBackedStateStoreProvider(
/**
* Whether all updates have been committed
*/
- override private[state] def hasCommitted: Boolean = {
+ override private[streaming] def hasCommitted: Boolean = {
state == COMMITTED
}
http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 9bc6c0e..d59746f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -83,7 +83,7 @@ trait StateStore {
/**
* Whether all updates have been committed
*/
- private[state] def hasCommitted: Boolean
+ private[streaming] def hasCommitted: Boolean
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org