You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2021/08/31 16:56:15 UTC

[spark] branch branch-3.2 updated: [SPARK-36619][SS] Fix bugs around prefix-scan for HDFS backed state store and RocksDB state store

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 9a71c4c  [SPARK-36619][SS] Fix bugs around prefix-scan for HDFS backed state store and RocksDB state store
9a71c4c is described below

commit 9a71c4ca840cd74f0085f2f2f4ed879eac976acc
Author: Jungtaek Lim <ka...@gmail.com>
AuthorDate: Wed Sep 1 00:51:45 2021 +0800

    [SPARK-36619][SS] Fix bugs around prefix-scan for HDFS backed state store and RocksDB state store
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to fix bugs around prefix-scan for both HDFS backed state store and RocksDB state store.
    
    > HDFS backed state store
    
    We did "shallow-copy" on copying prefix map, which leads the values of prefix map (mutable Set) to be "same instances" across multiple versions. This PR fixes it via creating a new mutable Set and copying elements.
    
    > RocksDB state store
    
    Prefix-scan iterators are only closed on RocksDB.rollback(), which is only called in RocksDBStateStore.abort().
    
    While `RocksDBStateStore.abort()` method will be called for streaming session window (since it has two physical plans for read and write), other stateful operators which only have read-write physical plan will call either commit or abort, and don't close the iterators on committing. These unclosed iterators can be "reused" and produce incorrect outputs.
    
    This PR ensures that resetting prefix-scan iterators is done on loading RocksDB, which was only done in rollback.
    
    ### Why are the changes needed?
    
    Please refer the above section on explanation of bugs and treatments.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Modified UT which failed without this PR and passes with this PR.
    
    Closes #33870 from HeartSaVioR/SPARK-36619.
    
    Authored-by: Jungtaek Lim <ka...@gmail.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
    (cherry picked from commit 60a72c938a62092294e2ae30314a2c1a0222dd2b)
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../streaming/state/HDFSBackedStateStoreMap.scala  |  9 +++-
 .../sql/execution/streaming/state/RocksDB.scala    | 13 +++--
 .../streaming/state/StateStoreSuite.scala          | 56 +++++++++++++++++-----
 3 files changed, 60 insertions(+), 18 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreMap.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreMap.scala
index 73608d4..9a0b6a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreMap.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreMap.scala
@@ -129,7 +129,14 @@ class PrefixScannableHDFSBackedStateStoreMap(
     other match {
       case o: PrefixScannableHDFSBackedStateStoreMap =>
         map.putAll(o.map)
-        prefixKeyToKeysMap.putAll(o.prefixKeyToKeysMap)
+        o.prefixKeyToKeysMap.asScala.foreach { case (prefixKey, keySet) =>
+          // Here we create a copy version of Set. Shallow-copying the prefix key map will lead
+          // two maps having the same Set "instances" for values, meaning modifying the prefix map
+          // on newer version will also affect on the prefix map on older version.
+          val newSet = new mutable.HashSet[UnsafeRow]()
+          newSet ++= keySet
+          prefixKeyToKeysMap.put(prefixKey, newSet)
+        }
 
       case _ => other.iterator().foreach { pair => put(pair.key, pair.value) }
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index e9ef6e5..6004bdb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -120,6 +120,8 @@ class RocksDB(
       if (conf.resetStatsOnLoad) {
         nativeStats.reset
       }
+      // reset resources to prevent side-effects from previous loaded version
+      closePrefixScanIterators()
       writeBatch.clear()
       logInfo(s"Loaded $version")
     } catch {
@@ -290,8 +292,7 @@ class RocksDB(
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
-    prefixScanReuseIter.entrySet().asScala.foreach(_.getValue.close())
-    prefixScanReuseIter.clear()
+    closePrefixScanIterators()
     writeBatch.clear()
     numKeysOnWritingVersion = numKeysOnLoadedVersion
     release()
@@ -307,8 +308,7 @@ class RocksDB(
 
   /** Release all resources */
   def close(): Unit = {
-    prefixScanReuseIter.entrySet().asScala.foreach(_.getValue.close())
-    prefixScanReuseIter.clear()
+    closePrefixScanIterators()
     try {
       closeDB()
 
@@ -411,6 +411,11 @@ class RocksDB(
     acquireLock.notifyAll()
   }
 
+  private def closePrefixScanIterators(): Unit = {
+    prefixScanReuseIter.entrySet().asScala.foreach(_.getValue.close())
+    prefixScanReuseIter.clear()
+  }
+
   private def getDBProperty(property: String): Long = {
     db.getProperty(property).toLong
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 05772cd..8a6f66d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -803,27 +803,57 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
     // Verify state before starting a new set of updates
     assert(getLatestData(provider).isEmpty)
 
-    val store = provider.getStore(0)
+    var store = provider.getStore(0)
 
-    val key1 = Seq("a", "b", "c")
-    val key2 = Seq(1, 2, 3)
-    val keys = for (k1 <- key1; k2 <- key2) yield (k1, k2)
+    def putCompositeKeys(keys: Seq[(String, Int)]): Unit = {
+      val randomizedKeys = scala.util.Random.shuffle(keys.toList)
+      randomizedKeys.foreach { case (key1, key2) =>
+        put(store, key1, key2, key2)
+      }
+    }
 
-    val randomizedKeys = scala.util.Random.shuffle(keys.toList)
+    def verifyScan(key1: Seq[String], key2: Seq[Int]): Unit = {
+      key1.foreach { k1 =>
+        val keyValueSet = store.prefixScan(dataToPrefixKeyRow(k1)).map { pair =>
+          rowPairToDataPair(pair.withRows(pair.key.copy(), pair.value.copy()))
+        }.toSet
 
-    randomizedKeys.foreach { case (key1, key2) =>
-      put(store, key1, key2, key2)
+        assert(keyValueSet === key2.map(k2 => ((k1, k2), k2)).toSet)
+      }
     }
 
-    key1.foreach { k1 =>
-      val keyValueSet = store.prefixScan(dataToPrefixKeyRow(k1)).map { pair =>
-        rowPairToDataPair(pair.withRows(pair.key.copy(), pair.value.copy()))
-      }.toSet
+    val key1AtVersion0 = Seq("a", "b", "c")
+    val key2AtVersion0 = Seq(1, 2, 3)
+    val keysAtVersion0 = for (k1 <- key1AtVersion0; k2 <- key2AtVersion0) yield (k1, k2)
 
-      assert(keyValueSet === key2.map(k2 => ((k1, k2), k2)).toSet)
-    }
+    putCompositeKeys(keysAtVersion0)
+    verifyScan(key1AtVersion0, key2AtVersion0)
 
     assert(store.prefixScan(dataToPrefixKeyRow("non-exist")).isEmpty)
+
+    // committing and loading the version 1 (the version being committed)
+    store.commit()
+    store = provider.getStore(1)
+
+    // before putting the new key-value pairs, verify prefix scan works for existing keys
+    verifyScan(key1AtVersion0, key2AtVersion0)
+
+    val key1AtVersion1 = Seq("c", "d")
+    val key2AtVersion1 = Seq(4, 5, 6)
+    val keysAtVersion1 = for (k1 <- key1AtVersion1; k2 <- key2AtVersion1) yield (k1, k2)
+
+    // put a new key-value pairs, and verify that prefix scan reflects the changes
+    putCompositeKeys(keysAtVersion1)
+    verifyScan(Seq("c"), Seq(1, 2, 3, 4, 5, 6))
+    verifyScan(Seq("d"), Seq(4, 5, 6))
+
+    // aborting and loading the version 1 again (keysAtVersion1 should be rolled back)
+    store.abort()
+    store = provider.getStore(1)
+
+    // prefix scan should not reflect the uncommitted changes
+    verifyScan(key1AtVersion0, key2AtVersion0)
+    verifyScan(Seq("d"), Seq.empty)
   }
 
   testWithAllCodec("numKeys metrics") {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org