You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "anishshri-db (via GitHub)" <gi...@apache.org> on 2023/04/27 19:29:58 UTC

[GitHub] [spark] anishshri-db opened a new pull request, #40981: [SPARK-43311] Add RocksDB state store memory management enhancements

anishshri-db opened a new pull request, #40981:
URL: https://github.com/apache/spark/pull/40981

   ### What changes were proposed in this pull request?
   Add RocksDB state store memory management enhancements
   
   This change does the following:
   
   - remove use of writeBatchWithIndex
   - move towards using Native RocksDB operations
   - remove use of RocksDB WAL
   - add support for bounding memory usage for all RocksDB state store instances on executor using the write buffer manager
   
   ### Why are the changes needed?
   Today when RocksDB is used as a State Store provider, memory usage when writing using writeBatch is not capped. Also, a related issue is that the state store coordinator can create multiple RocksDB instances on a single node without enforcing a global limit on native memory usage. Due to these issues we could run into OOM issues and task failures. 
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Added unit tests and fixed existing ones
   
   RocksDBStateStoreSuite
   ```
   [info] Run completed in 40 seconds, 916 milliseconds.
   [info] Total number of tests run: 33
   [info] Suites: completed 1, aborted 0
   [info] Tests: succeeded 33, failed 0, canceled 0, ignored 0, pending 0
   [info] All tests passed.
   ```
   
   StateStoreSuite
   ```
   
   [info] Run completed in 2 minutes, 33 seconds.
   [info] Total number of tests run: 85
   [info] Suites: completed 1, aborted 0
   [info] Tests: succeeded 85, failed 0, canceled 0, ignored 0, pending 0
   [info] All tests passed.
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179958267


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   Yea use-case seems quite narrow and might not be safe if its passed around too. I would probably prefer to remove this. Lmk what you think ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179873218


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   Yes correct. we close it in commit & rollback. In this case, the call is made twice though. Basically to check values in previous batch and then newly added values in current batch. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179859324


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   My memory is broken but there should be a reason I didn't follow the cleanup approach of iterator in iterator(). The cleanup approach of iterator() relies on task completion listener, and I roughly remember there was an issue with iterator in prefixScan so I couldn't do the same. 
   
   If that was related to WriteBatch then we may not need this anymore and just follow the approach of iterator(), but honestly I didn't track down the issue as such.
   
   That said, if we remove this, we probably need to at least employ the cleanup approach for we use in iterator().



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   My memory is broken but there should be a reason I didn't follow the cleanup approach of iterator in iterator(). The cleanup approach of iterator() relies on task completion listener, and I roughly remember there was an issue with iterator in prefixScan so I couldn't do the same. 
   
   If that was related to WriteBatch then we may not need this anymore and just follow the approach of iterator(), but honestly I didn't track down the issue as such.
   
   That said, if we remove this, we probably need to at least employ the cleanup approach we use in iterator().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1180848520


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -61,17 +60,27 @@ class RocksDB(
 
   // Java wrapper objects linking to native RocksDB objects
   private val readOptions = new ReadOptions()  // used for gets
-  private val writeOptions = new WriteOptions().setSync(true)  // wait for batched write to complete

Review Comment:
   We cant use sync since WAL is disabled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179859324


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   My memory is broken but there should be a reason I didn't follow the cleanup approach of iterator in iterator(). The cleanup approach of iterator() relies on task completion listener, and I roughly remember there was an issue with iterator in prefixScan so I couldn't do the same. 
   
   If that was related to WriteBatch then we may not need this anymore and just follow the approach of iterator(), but honestly I didn't track down the issue as such.
   
   That said, if we remove this, we probably need to employ the cleanup approach for we use in iterator().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1180883584


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -348,9 +340,8 @@ class RocksDB(
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
-    closePrefixScanIterators()
-    resetWriteBatch()
     numKeysOnWritingVersion = numKeysOnLoadedVersion
+    loadedVersion = -1L

Review Comment:
   I don't think we need to do anything explicitly here. Basically the call to load the rocksdb version from dfs should ensure that the appropriate cleanup is done.
   
   ```
       val metadata = if (version == 0) {
         if (localDir.exists) Utils.deleteRecursively(localDir)
         localDir.mkdirs()
         RocksDBCheckpointMetadata(Seq.empty, 0)
       } else {
         // Delete all non-immutable files in local dir, and unzip new ones from DFS commit file
         listRocksDBFiles(localDir)._2.foreach(_.delete())
   ```
   
   If the instance is removed, we would eventually call `close` which would delete the base root dir itself.
   
   cc - @HeartSaVioR - in case there is any condition I may be missing here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1180848307


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   Done - added the close and cleanup logic to mimic iterator()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179869841


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   It is simpler than that - cached prefixScan iterator does not live across microbatches. We close it in both commit and rollback. So it's not really a cache but closer to reference tracking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1181325162


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -348,9 +340,8 @@ class RocksDB(
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
-    closePrefixScanIterators()
-    resetWriteBatch()
     numKeysOnWritingVersion = numKeysOnLoadedVersion
+    loadedVersion = -1L

Review Comment:
   I guess we are just reusing the mechanism of re-load, which has been existing for a while. If anyone figures out any possibility of bug then it'd be awesome to have a fix, but it sounds to me as out of scope even if there is a bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #40981:
URL: https://github.com/apache/spark/pull/40981#issuecomment-1530068162

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements
URL: https://github.com/apache/spark/pull/40981


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179895611


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   My bad, you're right I forgot about it. There are two places for prefix scan to be used (in the same task), and I intentionally didn't close the iterator per each usage to reuse the underlying iterator among two usages. But that optimization seems to be dangerous as it will lead to correctness issue if there is any possibility for two usages to use the iterator at the same time. If the benefit we can get from dangerous optimization is not huge, I'd lean on removing the optimization. Just that we need to revisit the resource cleanup for prefix scan iterator then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] siying commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "siying (via GitHub)" <gi...@apache.org>.
siying commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1180870219


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   I would follow your intuition that removing the optimization won't cause a big issue. I guess if we would keep it, the logic might be more complicated, so it makes sense, so it makes sense.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -61,17 +60,27 @@ class RocksDB(
 
   // Java wrapper objects linking to native RocksDB objects
   private val readOptions = new ReadOptions()  // used for gets
-  private val writeOptions = new WriteOptions().setSync(true)  // wait for batched write to complete

Review Comment:
   It doesn't matter but for a record, I don't see a reason why we need to do sync in the first place. WriteOptions.sync is only there to get data preserved after machine power reset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1180848520


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -61,17 +60,27 @@ class RocksDB(
 
   // Java wrapper objects linking to native RocksDB objects
   private val readOptions = new ReadOptions()  // used for gets
-  private val writeOptions = new WriteOptions().setSync(true)  // wait for batched write to complete

Review Comment:
   Not sure I understand your question. We cant use sync since WAL is disabled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Kimahriman commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "Kimahriman (via GitHub)" <gi...@apache.org>.
Kimahriman commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1180404675


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -61,17 +60,27 @@ class RocksDB(
 
   // Java wrapper objects linking to native RocksDB objects
   private val readOptions = new ReadOptions()  // used for gets
-  private val writeOptions = new WriteOptions().setSync(true)  // wait for batched write to complete

Review Comment:
   Is this not needed because when we zip things up we'll see anything in the operating system cache anyway?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -348,9 +340,8 @@ class RocksDB(
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
-    closePrefixScanIterators()
-    resetWriteBatch()
     numKeysOnWritingVersion = numKeysOnLoadedVersion
+    loadedVersion = -1L

Review Comment:
   Does any local files need to be cleaned up if they've been written to disk during the task?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179867179


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   Yea so the previous model assumed that the iterator could reflect changes to writeBatch for the given thread. That is not true with the change any more.
   
   So basically the following would fail, if we were to use cached iterator.
   - Load version 0
   - Add kv pairs (c, 1) (c, 2) (c, 3)
   - commit 0
   - Load version 1
   - Call prefix scan and load map
   - Add kv pairs (c, 4) (c, 5) (c, 6)
   - Read key-values and now if we rely only on the old iterator loaded in the map, we read (c, 1), (c, 2) (c, 3). With the old writeBatch in place, the iterator was valid for the writeBatch as well which is why we could read all the kv pairs correctly.
   
   Sample failure:
   ```
   [info]   Set((("c", 1), 1), (("c", 2), 2), (("c", 3), 3)) did not equal Set((("c", 4), 4), (("c", 3), 3), (("c", 1), 1), (("c", 5), 5), (("c", 2), 2), (("c", 6), 6)) (StateStoreSuite.scala:917)
   [info]   Analysis:
   [info]   Set(missingInLeft: [(("c", 4), 4), (("c", 5), 5), (("c", 6), 6)])
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179961280


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   Yeah I'm OK to remove optimization but then would like to see iterators to be sure to be closed. We intend to not close the iterator at the caller side for reuse, but now we should either ensure closing it here or from caller(s).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1180871653


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -61,17 +60,27 @@ class RocksDB(
 
   // Java wrapper objects linking to native RocksDB objects
   private val readOptions = new ReadOptions()  // used for gets
-  private val writeOptions = new WriteOptions().setSync(true)  // wait for batched write to complete

Review Comment:
   Yea I don't think it was actually required even in the existing model. But with the new model, RocksDB library prevents using this option at all and throws this exception
   
   ```
   [info]   org.rocksdb.RocksDBException: Sync writes has to enable WAL.
   [info]   at org.rocksdb.RocksDB.put(Native Method)
   [info]   at org.rocksdb.RocksDB.put(RocksDB.java:904)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179859324


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   My memory is broken but there should be a reason I didn't follow the cleanup approach of iterator in iterator(). That relies on task completion listener, and I roughly remember there was an issue with iterator in prefixScan so I couldn't do the same. 
   
   If that was related to WriteBatch then we may not need this anymore and just follow the approach of iterator(), but honestly I didn't track down the issue as such.
   
   That said, if we remove this, we probably need to employ the cleanup approach for we use in iterator().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179867179


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   Yea so the previous model assumed that the iterator could reflect changes to writeBatch for the given thread. That is not true with the change any more.
   
   So basically the following would fail, if we were to use cached iterator.
   - Load version 0
   - Add kv pairs (c, 1) (c, 2) (c, 3)
   - commit 0
   - Load version 1
   - Call prefix scan and load map
   - Add kv pairs (c, 4) (c, 5) (c, 6)
   - Read key-values and now if we rely only on the old iterator loaded in the map, we read (c, 1), (c, 2) (c, 3). With the old writeBatch in place, the iterator was valid for the writeBatch as well which is why we could read all the kv pairs correctly. With the new change, we wont read the newly added values written to the DB.
   
   Sample failure:
   ```
   [info]   Set((("c", 1), 1), (("c", 2), 2), (("c", 3), 3)) did not equal Set((("c", 4), 4), (("c", 3), 3), (("c", 1), 1), (("c", 5), 5), (("c", 2), 2), (("c", 6), 6)) (StateStoreSuite.scala:917)
   [info]   Analysis:
   [info]   Set(missingInLeft: [(("c", 4), 4), (("c", 5), 5), (("c", 6), 6)])
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   Yea so the previous model assumed that the iterator could reflect changes to writeBatch for the given thread. That is not true with the change any more.
   
   So basically the following would fail, if we were to use cached iterator.
   - Load version 0
   - Add kv pairs (c, 1) (c, 2) (c, 3)
   - commit 0
   - Load version 1
   - Call prefix scan and load map
   - Add kv pairs (c, 4) (c, 5) (c, 6)
   - Read key-values and now if we rely only on the old iterator loaded in the map, we read (c, 1), (c, 2) (c, 3). With the old writeBatch in place, the iterator was valid for the writeBatch as well which is why we could read all the kv pairs correctly. With the new change, we wont read the newly added kv pairs written to the DB.
   
   Sample failure:
   ```
   [info]   Set((("c", 1), 1), (("c", 2), 2), (("c", 3), 3)) did not equal Set((("c", 4), 4), (("c", 3), 3), (("c", 1), 1), (("c", 5), 5), (("c", 2), 2), (("c", 6), 6)) (StateStoreSuite.scala:917)
   [info]   Analysis:
   [info]   Set(missingInLeft: [(("c", 4), 4), (("c", 5), 5), (("c", 6), 6)])
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivanmurray commented on pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "ivanmurray (via GitHub)" <gi...@apache.org>.
ivanmurray commented on PR #40981:
URL: https://github.com/apache/spark/pull/40981#issuecomment-1599100894

   Could this by any chance get pulled into 3.4.1?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] siying commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "siying (via GitHub)" <gi...@apache.org>.
siying commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179746525


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -348,9 +340,8 @@ class RocksDB(
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
-    closePrefixScanIterators()
-    resetWriteBatch()
     numKeysOnWritingVersion = numKeysOnLoadedVersion
+    loadedVersion = -1L

Review Comment:
   Does it mean we will load a checkpoint from DFS, right?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   I noticed that the iterator reuse logic is removed. Is it related to the change? This could get CPU regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #40981:
URL: https://github.com/apache/spark/pull/40981#issuecomment-1619328198

   @ivanmurray 
   Sorry, Spark community has been avoiding to bring new features/improvements into bugfix version. We are about to start the release phase for Spark 3.5.0, so stay tuned!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #40981:
URL: https://github.com/apache/spark/pull/40981#issuecomment-1619324055

   @ivanmurray - Hmm not sure. cc - @HeartSaVioR - thoughts ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179873218


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   Yes correct. we close it in commit & rollback. In this case, the prefix scan call is made twice though. Basically to check values in previous batch (which adds an entry in the map) and then newly added values in current batch (which reuses the entry from the cache). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179961280


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   Yeah I'm OK to remove optimization but then would like to see iterators to be sure to be closed. We intend to not close the iterator at the caller side for reuse, but since we don't reuse the iterator, we should either ensure closing it here or from caller(s).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179895611


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   My bad, you're right I forgot about it. There are two usages for prefix scan, and I intentionally didn't close the iterator per each usage to reuse the underlying iterator among two usages. But that optimization seems to be dangerous as it will lead to correctness issue if there is any possibility for two usages to use the iterator at the same time. If the benefit we can get from dangerous optimization is not huge, I'd lean on removing the optimization. Just that we need to revisit the resource cleanup for prefix scan iterator then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Kimahriman commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "Kimahriman (via GitHub)" <gi...@apache.org>.
Kimahriman commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1180419850


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -348,9 +340,8 @@ class RocksDB(
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
-    closePrefixScanIterators()
-    resetWriteBatch()
     numKeysOnWritingVersion = numKeysOnLoadedVersion
+    loadedVersion = -1L

Review Comment:
   Do any local files need to be cleaned up if they've been written to disk during the task?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #40981:
URL: https://github.com/apache/spark/pull/40981#issuecomment-1529955932

   Thanks @HeartSaVioR . @siying and @Kimahriman - could you folks please take a look ? Thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #40981:
URL: https://github.com/apache/spark/pull/40981#issuecomment-1526254611

   @HeartSaVioR - please take a look. Thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179851039


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -348,9 +340,8 @@ class RocksDB(
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
-    closePrefixScanIterators()
-    resetWriteBatch()
     numKeysOnWritingVersion = numKeysOnLoadedVersion
+    loadedVersion = -1L

Review Comment:
   Yeah since we no longer use WriteBatch, we are unable to roll back any writes and have to load from DFS. We think the rate of failure is small enough to tolerate the cost, unless the query (or infrastructure) is unstable and cannot make progress in any way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #40981: [SPARK-43311][SS] Add RocksDB state store provider memory management enhancements

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179752292


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -348,9 +340,8 @@ class RocksDB(
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
-    closePrefixScanIterators()
-    resetWriteBatch()
     numKeysOnWritingVersion = numKeysOnLoadedVersion
+    loadedVersion = -1L

Review Comment:
   Yes correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org