You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "siying (via GitHub)" <gi...@apache.org> on 2024/01/12 20:40:05 UTC

[PR] Make RocksDB State Store Compaction Less Likely to fall behind [spark]

siying opened a new pull request, #44712:
URL: https://github.com/apache/spark/pull/44712

   
   ### What changes were proposed in this pull request?
   (1) increase RocksDB L0 compaction trigger, slowdown trigger and stop trigger
   (2) Increase background threads for flush and compaction to 2. To limit the chance of a CPU spike, make CPU priority for compaction to be low
   
   ### Why are the changes needed?
   We introduce two RocksDB tunings to reduce the chance that RocksDB compaction can fall behind, delay a checkpoint.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   We run an end-to-end stream query where RocksDB state store is reasonably loaded. This change reduces latency by about 30%.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46705][SS] Make RocksDB State Store Compaction Less Likely to fall behind [spark]

Posted by "siying (via GitHub)" <gi...@apache.org>.
siying commented on code in PR #44712:
URL: https://github.com/apache/spark/pull/44712#discussion_r1465491569


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -105,6 +105,17 @@ class RocksDB(
   }
 
   columnFamilyOptions.setCompressionType(getCompressionType(conf.compression))
+  // We can easily accumulate many small L0 files if changelog is not enabled, or full snapshot
+  // interval is not large enough. Triggering compactions for those small files are expensive.
+  // We make it harder to trigger and allow more L0 files without write stalling.
+  // We increase L0->L1 compaction threshold to reduce the overhead of L0->L1 compaction.
+  // Given the nature of small L0 files, for most workloads, even the value of 16 is too low for
+  // L0->L1 write amplification. However, if the value is too large, we may risk the chance that
+  // in some workloads where batch size is very large, some data might take a very long time to
+  // be compacted.
+  columnFamilyOptions.setLevel0FileNumCompactionTrigger(16)
+  columnFamilyOptions.setLevel0SlowdownWritesTrigger(200)

Review Comment:
   It is the crucial part of the PR. By default we do snapshot checkpoint every 10 batches, and compaction triggers in every 4 flushes (right now a snapshot checkpoint does an implicit flush). So in 1 in 40 batches, we will trigger a compaction, and that batch's latency will impact P99. If we increase the compaction threshold from 4 to 16, we will only do compaction every 1 in 160 batches, and it will push this out of P99.
   
   Of course, by having it to happen 1/4 of the time, the compaction is much less likely to catchup, so we probably won't have the problem anyway, but even if we have the problem in another workload, it is much less likely to impact P99.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46705][SS] Make RocksDB State Store Compaction Less Likely to fall behind [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44712:
URL: https://github.com/apache/spark/pull/44712#discussion_r1454498382


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -671,6 +682,21 @@ class RocksDB(
   override protected def logName: String = s"${super.logName} $loggingId"
 }
 
+object RocksDB {
+  // Increase background thread pool size to 2 flush threads and 2 compaction threads.
+  // Snapshot checkpoint requires a flush, so more threads will reduce the blocking time. More
+  // compaction threads will reduce the chance that compaction is backlogged, causing online
+  // traffic to slowdown.
+  if (RocksDBEnv.getDefault().getBackgroundThreads(Priority.HIGH) < 2) {

Review Comment:
   How would this interact with the `setMaxgroundJobs` setting though ?
   
   if user sets maxBackgroundJobs=2, then we are explicitly overriding to 4 ? should we change minimum allowed for maxBackgroundJobs to 4 then ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46705][SS] Make RocksDB State Store Compaction Less Likely to fall behind [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44712:
URL: https://github.com/apache/spark/pull/44712#discussion_r1465823018


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -671,6 +682,21 @@ class RocksDB(
   override protected def logName: String = s"${super.logName} $loggingId"
 }
 
+object RocksDB {

Review Comment:
   We are not saying it will be called multiple times. We meant it is not clear whether this block is ever evaluated (executed once) or not (never executed).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46705][SS] Make RocksDB State Store Compaction Less Likely to fall behind [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #44712: [SPARK-46705][SS] Make RocksDB State Store Compaction Less Likely to fall behind
URL: https://github.com/apache/spark/pull/44712


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46705][SS] Make RocksDB State Store Compaction Less Likely to fall behind [spark]

Posted by "siying (via GitHub)" <gi...@apache.org>.
siying commented on code in PR #44712:
URL: https://github.com/apache/spark/pull/44712#discussion_r1465491569


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -105,6 +105,17 @@ class RocksDB(
   }
 
   columnFamilyOptions.setCompressionType(getCompressionType(conf.compression))
+  // We can easily accumulate many small L0 files if changelog is not enabled, or full snapshot
+  // interval is not large enough. Triggering compactions for those small files are expensive.
+  // We make it harder to trigger and allow more L0 files without write stalling.
+  // We increase L0->L1 compaction threshold to reduce the overhead of L0->L1 compaction.
+  // Given the nature of small L0 files, for most workloads, even the value of 16 is too low for
+  // L0->L1 write amplification. However, if the value is too large, we may risk the chance that
+  // in some workloads where batch size is very large, some data might take a very long time to
+  // be compacted.
+  columnFamilyOptions.setLevel0FileNumCompactionTrigger(16)
+  columnFamilyOptions.setLevel0SlowdownWritesTrigger(200)

Review Comment:
   It is the crucial part of the PR. By default we do snapshot checkpoint every 10 batches, and compaction triggers in every 4 flushes (right now a snapshot checkpoint does an implicit flush). So in 1 in 40 batches, we will trigger a compaction, and that batch's latency will impact P99. If we increase the compaction threshold from 4 to 16, we will only do compaction every 1 in 160 batches, and it will push this out of P99.
   
   Of course, by having it to happen 1/4 of the time, the compaction is much less likely to catchup, so we probably won't have the problem anyway, but even if we have the problem in another workload, it is much less likely to impact P99.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -671,6 +682,21 @@ class RocksDB(
   override protected def logName: String = s"${super.logName} $loggingId"
 }
 
+object RocksDB {

Review Comment:
   Sure I'll add a logging and see how many times it is called, though I believe it is a singleton.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] Make RocksDB State Store Compaction Less Likely to fall behind [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #44712:
URL: https://github.com/apache/spark/pull/44712#issuecomment-1890180860

   Let's file a JIRA, see also https://spark.apache.org/contributing.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46705][SS] Make RocksDB State Store Compaction Less Likely to fall behind [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44712:
URL: https://github.com/apache/spark/pull/44712#discussion_r1454494100


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -105,6 +105,17 @@ class RocksDB(
   }
 
   columnFamilyOptions.setCompressionType(getCompressionType(conf.compression))
+  // We can easily accumulate many small L0 files if changelog is not enabled, or full snapshot
+  // interval is not large enough. Triggering compactions for those small files are expensive.
+  // We make it harder to trigger and allow more L0 files without write stalling.
+  // We increase L0->L1 compaction threshold to reduce the overhead of L0->L1 compaction.
+  // Given the nature of small L0 files, for most workloads, even the value of 16 is too low for
+  // L0->L1 write amplification. However, if the value is too large, we may risk the chance that
+  // in some workloads where batch size is very large, some data might take a very long time to
+  // be compacted.
+  columnFamilyOptions.setLevel0FileNumCompactionTrigger(16)
+  columnFamilyOptions.setLevel0SlowdownWritesTrigger(200)

Review Comment:
   Do we need to make any of these configurable ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46705][SS] Make RocksDB State Store Compaction Less Likely to fall behind [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44712:
URL: https://github.com/apache/spark/pull/44712#discussion_r1465823018


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -671,6 +682,21 @@ class RocksDB(
   override protected def logName: String = s"${super.logName} $loggingId"
 }
 
+object RocksDB {

Review Comment:
   We are not saying it will be called multiple times. We meant it is not clear whether this block is ever evaluated (executed once) or not (never executed), because we do not explicitly refer this object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46705][SS] Make RocksDB State Store Compaction Less Likely to fall behind [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44712:
URL: https://github.com/apache/spark/pull/44712#discussion_r1454496688


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -671,6 +682,21 @@ class RocksDB(
   override protected def logName: String = s"${super.logName} $loggingId"
 }
 
+object RocksDB {

Review Comment:
   Do the code blocks below need to be embedded within a singleton ? Would the blocks be invoked currently ?
   
   cc - @HeartSaVioR - to confirm
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46705][SS] Make RocksDB State Store Compaction Less Likely to fall behind [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44712:
URL: https://github.com/apache/spark/pull/44712#discussion_r1455360732


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -671,6 +682,21 @@ class RocksDB(
   override protected def logName: String = s"${super.logName} $loggingId"
 }
 
+object RocksDB {

Review Comment:
   Maybe it won't be evaluated till it is referenced. @siying Could you please try adding a log to see whether the log is printed without referencing the object?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46705][SS] Make RocksDB State Store Compaction Less Likely to fall behind [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #44712:
URL: https://github.com/apache/spark/pull/44712#issuecomment-2094515094

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org