You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "anishshri-db (via GitHub)" <gi...@apache.org> on 2024/03/08 07:34:09 UTC

[PR] [WIP] Issue to fix foreachbatch persist issue for stateful queries [spark]

anishshri-db opened a new pull request, #45432:
URL: https://github.com/apache/spark/pull/45432

   ### What changes were proposed in this pull request?
   Issue to fix foreachbatch persist issue for stateful queries
   
   
   ### Why are the changes needed?
   This allows us to prevent stateful operators from reloading state on every foreachbatch invocation
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Added unit tests
   
   ```
   [info] Run completed in 20 seconds, 898 milliseconds.
   [info] Total number of tests run: 11
   [info] Suites: completed 1, aborted 0
   [info] Tests: succeeded 11, failed 0, canceled 0, ignored 0, pending 0
   [info] All tests passed.
   ```
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47329][SS] Persist dataframe while using foreachbatch and stateful streaming query to prevent state from being re-loaded in each batch [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #45432:
URL: https://github.com/apache/spark/pull/45432#discussion_r1522240457


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala:
##########
@@ -22,19 +22,38 @@ import scala.util.control.NonFatal
 import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LogicalPlan, TransformWithState}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.streaming.DataStreamWriter
 
 class ForeachBatchSink[T](batchWriter: (Dataset[T], Long) => Unit, encoder: ExpressionEncoder[T])
   extends Sink {
 
+  private def isQueryStateful(logicalPlan: LogicalPlan): Boolean = {
+    logicalPlan.collect {
+      case node @ (_: Aggregate | _: Distinct | _: FlatMapGroupsWithState
+                   | _: FlatMapGroupsInPandasWithState | _: TransformWithState | _: Deduplicate
+                   | _: DeduplicateWithinWatermark | _: GlobalLimit) if node.isStreaming => node
+      case node @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => node
+    }.nonEmpty
+  }
+
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     val node = LogicalRDD.fromDataset(rdd = data.queryExecution.toRdd, originDataset = data,
       isStreaming = false)
     implicit val enc = encoder
     val ds = Dataset.ofRows(data.sparkSession, node).as[T]
-    callBatchWriter(ds, batchId)
+    // SPARK-47329 - persist the dataframe for stateful queries to prevent state stores
+    // from reloading state multiple times in each batch
+    val isStateful = isQueryStateful(data.logicalPlan)
+    if (isStateful) {
+      ds.persist()
+      callBatchWriter(ds, batchId)
+      ds.unpersist()

Review Comment:
   Shall we wrap this in try catch finally to avoid leaking the cache?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47329][SS] Persist dataframe while using foreachbatch and stateful streaming query to prevent state from being re-loaded in each batch [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45432:
URL: https://github.com/apache/spark/pull/45432#discussion_r1522347224


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala:
##########
@@ -22,19 +22,38 @@ import scala.util.control.NonFatal
 import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LogicalPlan, TransformWithState}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.streaming.DataStreamWriter
 
 class ForeachBatchSink[T](batchWriter: (Dataset[T], Long) => Unit, encoder: ExpressionEncoder[T])
   extends Sink {
 
+  private def isQueryStateful(logicalPlan: LogicalPlan): Boolean = {
+    logicalPlan.collect {
+      case node @ (_: Aggregate | _: Distinct | _: FlatMapGroupsWithState
+                   | _: FlatMapGroupsInPandasWithState | _: TransformWithState | _: Deduplicate
+                   | _: DeduplicateWithinWatermark | _: GlobalLimit) if node.isStreaming => node
+      case node @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => node
+    }.nonEmpty
+  }
+
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     val node = LogicalRDD.fromDataset(rdd = data.queryExecution.toRdd, originDataset = data,
       isStreaming = false)
     implicit val enc = encoder
     val ds = Dataset.ofRows(data.sparkSession, node).as[T]
-    callBatchWriter(ds, batchId)
+    // SPARK-47329 - persist the dataframe for stateful queries to prevent state stores
+    // from reloading state multiple times in each batch
+    val isStateful = isQueryStateful(data.logicalPlan)
+    if (isStateful) {
+      ds.persist()
+      callBatchWriter(ds, batchId)
+      ds.unpersist()

Review Comment:
   Done - I guess `try finally` should be enough here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47329][SS] Persist dataframe while using foreachbatch and stateful streaming query to prevent state from being re-loaded in each batch [spark]

Posted by "Bobstar55 (via GitHub)" <gi...@apache.org>.
Bobstar55 commented on code in PR #45432:
URL: https://github.com/apache/spark/pull/45432#discussion_r1522898039


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala:
##########
@@ -22,19 +22,41 @@ import scala.util.control.NonFatal
 import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LogicalPlan, TransformWithState}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.streaming.DataStreamWriter
 
 class ForeachBatchSink[T](batchWriter: (Dataset[T], Long) => Unit, encoder: ExpressionEncoder[T])
   extends Sink {
 
+  private def isQueryStateful(logicalPlan: LogicalPlan): Boolean = {
+    logicalPlan.collect {
+      case node @ (_: Aggregate | _: Distinct | _: FlatMapGroupsWithState

Review Comment:
   join 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala:
##########
@@ -22,19 +22,41 @@ import scala.util.control.NonFatal
 import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LogicalPlan, TransformWithState}

Review Comment:
   join



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47329][SS][DOCS] Add note to persist dataframe while using foreachbatch and stateful streaming query to prevent state from being re-loaded in each batch [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45432:
URL: https://github.com/apache/spark/pull/45432#issuecomment-2005615311

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47329][SS] Persist dataframe while using foreachbatch and stateful streaming query to prevent state from being re-loaded in each batch [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #45432:
URL: https://github.com/apache/spark/pull/45432#issuecomment-1986223860

   @HeartSaVioR @sahnib @zsxwing - Could you PTAL ? Thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47329][SS][DOCS] Add note to persist dataframe while using foreachbatch and stateful streaming query to prevent state from being re-loaded in each batch [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #45432: [SPARK-47329][SS][DOCS] Add note to persist dataframe while using foreachbatch and stateful streaming query to prevent state from being re-loaded in each batch
URL: https://github.com/apache/spark/pull/45432


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47329][SS] Persist dataframe while using foreachbatch and stateful streaming query to prevent state from being re-loaded in each batch [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #45432:
URL: https://github.com/apache/spark/pull/45432#discussion_r1522535875


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala:
##########
@@ -22,19 +22,38 @@ import scala.util.control.NonFatal
 import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LogicalPlan, TransformWithState}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.streaming.DataStreamWriter
 
 class ForeachBatchSink[T](batchWriter: (Dataset[T], Long) => Unit, encoder: ExpressionEncoder[T])
   extends Sink {
 
+  private def isQueryStateful(logicalPlan: LogicalPlan): Boolean = {
+    logicalPlan.collect {
+      case node @ (_: Aggregate | _: Distinct | _: FlatMapGroupsWithState
+                   | _: FlatMapGroupsInPandasWithState | _: TransformWithState | _: Deduplicate
+                   | _: DeduplicateWithinWatermark | _: GlobalLimit) if node.isStreaming => node
+      case node @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => node
+    }.nonEmpty
+  }
+
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     val node = LogicalRDD.fromDataset(rdd = data.queryExecution.toRdd, originDataset = data,
       isStreaming = false)
     implicit val enc = encoder
     val ds = Dataset.ofRows(data.sparkSession, node).as[T]
-    callBatchWriter(ds, batchId)
+    // SPARK-47329 - persist the dataframe for stateful queries to prevent state stores
+    // from reloading state multiple times in each batch
+    val isStateful = isQueryStateful(data.logicalPlan)
+    if (isStateful) {
+      ds.persist()
+      callBatchWriter(ds, batchId)
+      ds.unpersist()

Review Comment:
   Yes, try finally is sufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47329][SS] Persist dataframe while using foreachbatch and stateful streaming query to prevent state from being re-loaded in each batch [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #45432:
URL: https://github.com/apache/spark/pull/45432#discussion_r1522240457


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala:
##########
@@ -22,19 +22,38 @@ import scala.util.control.NonFatal
 import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LogicalPlan, TransformWithState}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.streaming.DataStreamWriter
 
 class ForeachBatchSink[T](batchWriter: (Dataset[T], Long) => Unit, encoder: ExpressionEncoder[T])
   extends Sink {
 
+  private def isQueryStateful(logicalPlan: LogicalPlan): Boolean = {
+    logicalPlan.collect {
+      case node @ (_: Aggregate | _: Distinct | _: FlatMapGroupsWithState
+                   | _: FlatMapGroupsInPandasWithState | _: TransformWithState | _: Deduplicate
+                   | _: DeduplicateWithinWatermark | _: GlobalLimit) if node.isStreaming => node
+      case node @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => node
+    }.nonEmpty
+  }
+
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     val node = LogicalRDD.fromDataset(rdd = data.queryExecution.toRdd, originDataset = data,
       isStreaming = false)
     implicit val enc = encoder
     val ds = Dataset.ofRows(data.sparkSession, node).as[T]
-    callBatchWriter(ds, batchId)
+    // SPARK-47329 - persist the dataframe for stateful queries to prevent state stores
+    // from reloading state multiple times in each batch
+    val isStateful = isQueryStateful(data.logicalPlan)
+    if (isStateful) {
+      ds.persist()
+      callBatchWriter(ds, batchId)
+      ds.unpersist()

Review Comment:
   Shall we wrap this in try catch finally?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org