You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/01/14 23:04:21 UTC

[spark] branch branch-3.1 updated: [SPARK-34116][SS][TEST] Separate state store numKeys metric test and validate metrics after committing

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 3814e02  [SPARK-34116][SS][TEST] Separate state store numKeys metric test and validate metrics after committing
3814e02 is described below

commit 3814e02100e389c7f4e59e2ee8487d81c138cb4b
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Thu Jan 14 15:03:22 2021 -0800

    [SPARK-34116][SS][TEST] Separate state store numKeys metric test and validate metrics after committing
    
    ### What changes were proposed in this pull request?
    
    This patch proposes to pull the test of `numKeys` metric into a separate test in `StateStoreSuite`.
    
    ### Why are the changes needed?
    
    Right now in `StateStoreSuite`, the tests of get/put/remove/commit are mixed with `numKeys` metric test. I found it is flaky when I was testing with other `StateStore` implementation.
    
    Current test logic is tightly bound to the in-memory map behavior of `HDFSBackedStateStore`. For example, put can immediately show up in the `numKeys` metric.
    
    But for a `StateStore` implementation relying on external storage, e.g. RocksDB, the metric might be updated once the data is actually committed. And `StateStoreSuite` should be a common test suite for all kinds of StateStore implementations.
    
    Specifically, we also are able to check these metrics after state store is updated (committed). So I think we can refactor the test a little bit to make it easier to incorporate other `StateStore` externally.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, dev only.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #31183 from viirya/SPARK-34116.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 0e64a22b28c93474f5aa0ba02b9300d8eccfc53c)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../streaming/state/StateStoreSuite.scala          | 29 ++++++++++++++++++----
 1 file changed, 24 insertions(+), 5 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index d4cd3cd..b81e5b9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -834,7 +834,6 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
     // Verify state after updating
     put(store, "a", 1)
     assert(get(store, "a") === Some(1))
-    assert(store.metrics.numKeys === 1)
 
     assert(store.iterator().nonEmpty)
     assert(getLatestData(provider).isEmpty)
@@ -842,9 +841,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
     // Make updates, commit and then verify state
     put(store, "b", 2)
     put(store, "aa", 3)
-    assert(store.metrics.numKeys === 3)
     remove(store, _.startsWith("a"))
-    assert(store.metrics.numKeys === 1)
     assert(store.commit() === 1)
 
     assert(store.hasCommitted)
@@ -862,15 +859,37 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
     // New updates to the reloaded store with new version, and does not change old version
     val reloadedProvider = newStoreProvider(store.id)
     val reloadedStore = reloadedProvider.getStore(1)
-    assert(reloadedStore.metrics.numKeys === 1)
     put(reloadedStore, "c", 4)
-    assert(reloadedStore.metrics.numKeys === 2)
     assert(reloadedStore.commit() === 2)
     assert(rowsToSet(reloadedStore.iterator()) === Set("b" -> 2, "c" -> 4))
     assert(getLatestData(provider) === Set("b" -> 2, "c" -> 4))
     assert(getData(provider, version = 1) === Set("b" -> 2))
   }
 
+  testWithAllCodec("numKeys metrics") {
+    val provider = newStoreProvider()
+
+    // Verify state before starting a new set of updates
+    assert(getLatestData(provider).isEmpty)
+
+    val store = provider.getStore(0)
+    put(store, "a", 1)
+    put(store, "b", 2)
+    put(store, "c", 3)
+    put(store, "d", 4)
+    put(store, "e", 5)
+    assert(store.commit() === 1)
+    assert(store.metrics.numKeys === 5)
+    assert(rowsToSet(store.iterator()) === Set("a" -> 1, "b" -> 2, "c" -> 3, "d" -> 4, "e" -> 5))
+
+    val reloadedProvider = newStoreProvider(store.id)
+    val reloadedStore = reloadedProvider.getStore(1)
+    remove(reloadedStore, _ == "b")
+    assert(reloadedStore.commit() === 2)
+    assert(reloadedStore.metrics.numKeys === 4)
+    assert(rowsToSet(reloadedStore.iterator()) === Set("a" -> 1, "c" -> 3, "d" -> 4, "e" -> 5))
+  }
+
   testWithAllCodec("removing while iterating") {
     val provider = newStoreProvider()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org