You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/07/12 07:39:13 UTC

spark git commit: [SPARK-21370][SS] Add test for state reliability when one read-only state store aborts after read-write state store commits

Repository: spark
Updated Branches:
  refs/heads/master e16e8c7ad -> e0af76a36


[SPARK-21370][SS] Add test for state reliability when one read-only state store aborts after read-write state store commits

## What changes were proposed in this pull request?

During Streaming Aggregation, we have two StateStores per task, one used as read-only in
`StateStoreRestoreExec`, and one read-write used in `StateStoreSaveExec`. `StateStore.abort`
will be called for these StateStores if they haven't committed their results. We need to
make sure that `abort` in read-only store after a `commit` in the read-write store doesn't
accidentally lead to the deletion of state.

This PR adds a test for this condition.

## How was this patch tested?

This PR adds a test.

Author: Burak Yavuz <br...@gmail.com>

Closes #18603 from brkyvz/ss-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0af76a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0af76a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0af76a3

Branch: refs/heads/master
Commit: e0af76a36a67d409776bd379c6d6ef6d60356c06
Parents: e16e8c7
Author: Burak Yavuz <br...@gmail.com>
Authored: Wed Jul 12 00:39:09 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Jul 12 00:39:09 2017 -0700

----------------------------------------------------------------------
 .../streaming/state/StateStoreSuite.scala       | 31 ++++++++++++++++++++
 1 file changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e0af76a3/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index c2087ec..7cb86dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -665,6 +665,37 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
     checkInvalidVersion(3)
   }
 
+  test("two concurrent StateStores - one for read-only and one for read-write") {
+    // During Streaming Aggregation, we have two StateStores per task, one used as read-only in
+    // `StateStoreRestoreExec`, and one read-write used in `StateStoreSaveExec`. `StateStore.abort`
+    // will be called for these StateStores if they haven't committed their results. We need to
+    // make sure that `abort` in read-only store after a `commit` in the read-write store doesn't
+    // accidentally lead to the deletion of state.
+    val dir = newDir()
+    val storeId = StateStoreId(dir, 0L, 1)
+    val provider0 = newStoreProvider(storeId)
+    // prime state
+    val store = provider0.getStore(0)
+    val key = "a"
+    put(store, key, 1)
+    store.commit()
+    assert(rowsToSet(store.iterator()) === Set(key -> 1))
+
+    // two state stores
+    val provider1 = newStoreProvider(storeId)
+    val restoreStore = provider1.getStore(1)
+    val saveStore = provider1.getStore(1)
+
+    put(saveStore, key, get(restoreStore, key).get + 1)
+    saveStore.commit()
+    restoreStore.abort()
+
+    // check that state is correct for next batch
+    val provider2 = newStoreProvider(storeId)
+    val finalStore = provider2.getStore(2)
+    assert(rowsToSet(finalStore.iterator()) === Set(key -> 2))
+  }
+
   /** Return a new provider with a random id */
   def newStoreProvider(): ProviderClass
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org