You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2022/04/18 00:03:00 UTC
[jira] [Updated] (SPARK-38917) StreamingQuery.processAllAvailable() blocks forever on queries containing mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())
[ https://issues.apache.org/jira/browse/SPARK-38917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-38917:
---------------------------------
Priority: Major (was: Critical)
> StreamingQuery.processAllAvailable() blocks forever on queries containing mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-38917
> URL: https://issues.apache.org/jira/browse/SPARK-38917
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.1.2
> Reporter: Trevor Christman
> Priority: Major
>
> StreamingQuery.processAllAvailable() blocks forever when called on queries containing a mapGroupsWithState operation configured with GroupStateTimeout.ProcessingTimeTimeout().
>
> I think processAllAvailable() should unblock when all incoming data has been processed AND when all existing groupStates do not have a current timeout specified.
>
> Sample code to demonstrate this failure follows:
> {code:java}
> def demoSparkProcessAllAvailableBug() : Unit = {
> val localSpark = SparkSession
> .builder()
> .master("local[*]")
> .appName("demoSparkProcessAllAvailableBug")
> .config("spark.driver.host", "localhost")
> .getOrCreate()
> import localSpark.implicits._
> val demoDataStream = MemoryStream[BugDemo.NameNumberData](1, localSpark.sqlContext)
> demoDataStream.addData(BugDemo.NameNumberData("Alice", 1))
> demoDataStream.addData(BugDemo.NameNumberData("Bob", 2))
> demoDataStream.addData(BugDemo.NameNumberData("Alice", 3))
> demoDataStream.addData(BugDemo.NameNumberData("Bob", 4))
> // StreamingQuery.processAllAvailable() is successful when executing against NoTimeout,
> // but blocks forever when executing against EventTimeTimeout
> val timeoutTypes = List(GroupStateTimeout.NoTimeout(), GroupStateTimeout.ProcessingTimeTimeout())
> for (timeoutType <- timeoutTypes) {
> val totalByName = demoDataStream.toDF()
> .as[BugDemo.NameNumberData]
> .groupByKey(_.Name)
> .mapGroupsWithState(timeoutType)(BugDemo.summateRunningTotal)
> val totalByNameQuery = totalByName
> .writeStream
> .format("console")
> .outputMode("update")
> .start()
> println(s"${timeoutType} query starting to processAllAvailable()")
> totalByNameQuery.processAllAvailable()
> println(s"${timeoutType} query completed processAllAvailable()")
> totalByNameQuery.stop()
> }
> }
> }
> object BugDemo {
> def summateRunningTotal(name: String, input: Iterator[NameNumberData], groupState: GroupState[RunningTotal]): NameNumberData = {
> var currentTotal: Int = if (groupState.exists) {
> groupState.get.Total
> } else {
> 0
> }
> for (nameNumberData <- input) {
> currentTotal += nameNumberData.Number
> }
> groupState.update(RunningTotal(currentTotal))
> NameNumberData(name, currentTotal)
> }
> case class NameNumberData(
> Name: String,
> Number: Integer
> )
> case class RunningTotal(
> Total: Integer
> )
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org