You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/01/14 23:04:21 UTC
[spark] branch branch-3.1 updated: [SPARK-34116][SS][TEST] Separate
state store numKeys metric test and validate metrics after committing
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 3814e02 [SPARK-34116][SS][TEST] Separate state store numKeys metric test and validate metrics after committing
3814e02 is described below
commit 3814e02100e389c7f4e59e2ee8487d81c138cb4b
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Thu Jan 14 15:03:22 2021 -0800
[SPARK-34116][SS][TEST] Separate state store numKeys metric test and validate metrics after committing
### What changes were proposed in this pull request?
This patch proposes to pull the test of `numKeys` metric into a separate test in `StateStoreSuite`.
### Why are the changes needed?
Right now in `StateStoreSuite`, the tests of get/put/remove/commit are mixed with `numKeys` metric test. I found it is flaky when I was testing with other `StateStore` implementation.
Current test logic is tightly bound to the in-memory map behavior of `HDFSBackedStateStore`. For example, put can immediately show up in the `numKeys` metric.
But for a `StateStore` implementation relying on external storage, e.g. RocksDB, the metric might be updated once the data is actually committed. And `StateStoreSuite` should be a common test suite for all kinds of StateStore implementations.
Specifically, we also are able to check these metrics after state store is updated (committed). So I think we can refactor the test a little bit to make it easier to incorporate other `StateStore` externally.
### Does this PR introduce _any_ user-facing change?
No, dev only.
### How was this patch tested?
Unit test.
Closes #31183 from viirya/SPARK-34116.
Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
(cherry picked from commit 0e64a22b28c93474f5aa0ba02b9300d8eccfc53c)
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../streaming/state/StateStoreSuite.scala | 29 ++++++++++++++++++----
1 file changed, 24 insertions(+), 5 deletions(-)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index d4cd3cd..b81e5b9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -834,7 +834,6 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
// Verify state after updating
put(store, "a", 1)
assert(get(store, "a") === Some(1))
- assert(store.metrics.numKeys === 1)
assert(store.iterator().nonEmpty)
assert(getLatestData(provider).isEmpty)
@@ -842,9 +841,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
// Make updates, commit and then verify state
put(store, "b", 2)
put(store, "aa", 3)
- assert(store.metrics.numKeys === 3)
remove(store, _.startsWith("a"))
- assert(store.metrics.numKeys === 1)
assert(store.commit() === 1)
assert(store.hasCommitted)
@@ -862,15 +859,37 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
// New updates to the reloaded store with new version, and does not change old version
val reloadedProvider = newStoreProvider(store.id)
val reloadedStore = reloadedProvider.getStore(1)
- assert(reloadedStore.metrics.numKeys === 1)
put(reloadedStore, "c", 4)
- assert(reloadedStore.metrics.numKeys === 2)
assert(reloadedStore.commit() === 2)
assert(rowsToSet(reloadedStore.iterator()) === Set("b" -> 2, "c" -> 4))
assert(getLatestData(provider) === Set("b" -> 2, "c" -> 4))
assert(getData(provider, version = 1) === Set("b" -> 2))
}
+ testWithAllCodec("numKeys metrics") {
+ val provider = newStoreProvider()
+
+ // Verify state before starting a new set of updates
+ assert(getLatestData(provider).isEmpty)
+
+ val store = provider.getStore(0)
+ put(store, "a", 1)
+ put(store, "b", 2)
+ put(store, "c", 3)
+ put(store, "d", 4)
+ put(store, "e", 5)
+ assert(store.commit() === 1)
+ assert(store.metrics.numKeys === 5)
+ assert(rowsToSet(store.iterator()) === Set("a" -> 1, "b" -> 2, "c" -> 3, "d" -> 4, "e" -> 5))
+
+ val reloadedProvider = newStoreProvider(store.id)
+ val reloadedStore = reloadedProvider.getStore(1)
+ remove(reloadedStore, _ == "b")
+ assert(reloadedStore.commit() === 2)
+ assert(reloadedStore.metrics.numKeys === 4)
+ assert(rowsToSet(reloadedStore.iterator()) === Set("a" -> 1, "c" -> 3, "d" -> 4, "e" -> 5))
+ }
+
testWithAllCodec("removing while iterating") {
val provider = newStoreProvider()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org