You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/07/12 07:39:13 UTC
spark git commit: [SPARK-21370][SS] Add test for state reliability
when one read-only state store aborts after read-write state store commits
Repository: spark
Updated Branches:
refs/heads/master e16e8c7ad -> e0af76a36
[SPARK-21370][SS] Add test for state reliability when one read-only state store aborts after read-write state store commits
## What changes were proposed in this pull request?
During Streaming Aggregation, we have two StateStores per task, one used as read-only in
`StateStoreRestoreExec`, and one read-write used in `StateStoreSaveExec`. `StateStore.abort`
will be called for these StateStores if they haven't committed their results. We need to
make sure that `abort` in read-only store after a `commit` in the read-write store doesn't
accidentally lead to the deletion of state.
This PR adds a test for this condition.
## How was this patch tested?
This PR adds a test.
Author: Burak Yavuz <br...@gmail.com>
Closes #18603 from brkyvz/ss-test.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0af76a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0af76a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0af76a3
Branch: refs/heads/master
Commit: e0af76a36a67d409776bd379c6d6ef6d60356c06
Parents: e16e8c7
Author: Burak Yavuz <br...@gmail.com>
Authored: Wed Jul 12 00:39:09 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Jul 12 00:39:09 2017 -0700
----------------------------------------------------------------------
.../streaming/state/StateStoreSuite.scala | 31 ++++++++++++++++++++
1 file changed, 31 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e0af76a3/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index c2087ec..7cb86dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -665,6 +665,37 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
checkInvalidVersion(3)
}
+ test("two concurrent StateStores - one for read-only and one for read-write") {
+ // During Streaming Aggregation, we have two StateStores per task, one used as read-only in
+ // `StateStoreRestoreExec`, and one read-write used in `StateStoreSaveExec`. `StateStore.abort`
+ // will be called for these StateStores if they haven't committed their results. We need to
+ // make sure that `abort` in read-only store after a `commit` in the read-write store doesn't
+ // accidentally lead to the deletion of state.
+ val dir = newDir()
+ val storeId = StateStoreId(dir, 0L, 1)
+ val provider0 = newStoreProvider(storeId)
+ // prime state
+ val store = provider0.getStore(0)
+ val key = "a"
+ put(store, key, 1)
+ store.commit()
+ assert(rowsToSet(store.iterator()) === Set(key -> 1))
+
+ // two state stores
+ val provider1 = newStoreProvider(storeId)
+ val restoreStore = provider1.getStore(1)
+ val saveStore = provider1.getStore(1)
+
+ put(saveStore, key, get(restoreStore, key).get + 1)
+ saveStore.commit()
+ restoreStore.abort()
+
+ // check that state is correct for next batch
+ val provider2 = newStoreProvider(storeId)
+ val finalStore = provider2.getStore(2)
+ assert(rowsToSet(finalStore.iterator()) === Set(key -> 2))
+ }
+
/** Return a new provider with a random id */
def newStoreProvider(): ProviderClass
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org