You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2022/04/18 00:03:00 UTC

[jira] [Updated] (SPARK-38917) StreamingQuery.processAllAvailable() blocks forever on queries containing mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())

     [ https://issues.apache.org/jira/browse/SPARK-38917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-38917:
---------------------------------
    Priority: Major  (was: Critical)

> StreamingQuery.processAllAvailable() blocks forever on queries containing mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-38917
>                 URL: https://issues.apache.org/jira/browse/SPARK-38917
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.1.2
>            Reporter: Trevor Christman
>            Priority: Major
>
> StreamingQuery.processAllAvailable() blocks forever when called on queries containing a mapGroupsWithState operation configured with GroupStateTimeout.ProcessingTimeTimeout().
>  
> I think processAllAvailable() should unblock when all incoming data has been processed AND when all existing groupStates do not have a current timeout specified.
>  
> Sample code to demonstrate this failure follows:
> {code:java}
> def demoSparkProcessAllAvailableBug() : Unit = {
>     val localSpark = SparkSession
>       .builder()
>       .master("local[*]")
>       .appName("demoSparkProcessAllAvailableBug")
>       .config("spark.driver.host", "localhost")
>       .getOrCreate()
>     import localSpark.implicits._
>     val demoDataStream = MemoryStream[BugDemo.NameNumberData](1, localSpark.sqlContext)
>     demoDataStream.addData(BugDemo.NameNumberData("Alice", 1))
>     demoDataStream.addData(BugDemo.NameNumberData("Bob", 2))
>     demoDataStream.addData(BugDemo.NameNumberData("Alice", 3))
>     demoDataStream.addData(BugDemo.NameNumberData("Bob", 4))
>     // StreamingQuery.processAllAvailable() is successful when executing against NoTimeout,
>     // but blocks forever when executing against EventTimeTimeout
>     val timeoutTypes = List(GroupStateTimeout.NoTimeout(), GroupStateTimeout.ProcessingTimeTimeout())
>     for (timeoutType <- timeoutTypes) {
>       val totalByName = demoDataStream.toDF()
>         .as[BugDemo.NameNumberData]
>         .groupByKey(_.Name)
>         .mapGroupsWithState(timeoutType)(BugDemo.summateRunningTotal)
>       val totalByNameQuery = totalByName
>         .writeStream
>         .format("console")
>         .outputMode("update")
>         .start()
>       println(s"${timeoutType} query starting to processAllAvailable()")
>       totalByNameQuery.processAllAvailable()
>       println(s"${timeoutType} query completed processAllAvailable()")
>       totalByNameQuery.stop()
>     }
>   }
> }
> object BugDemo {
>   def summateRunningTotal(name: String, input: Iterator[NameNumberData], groupState: GroupState[RunningTotal]): NameNumberData = {
>     var currentTotal: Int = if (groupState.exists) {
>       groupState.get.Total
>     } else {
>       0
>     }
>     for (nameNumberData <- input) {
>       currentTotal += nameNumberData.Number
>     }
>     groupState.update(RunningTotal(currentTotal))
>     NameNumberData(name, currentTotal)
>   }
>   case class NameNumberData(
>     Name: String,
>     Number: Integer
>   )
>   case class RunningTotal(
>     Total: Integer
>   )
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org