You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Trevor Christman (Jira)" <ji...@apache.org> on 2022/04/15 18:39:00 UTC
[jira] [Created] (SPARK-38917) StreamingQuery.processAllAvailable() blocks forever on queries containing mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())
Trevor Christman created SPARK-38917:
----------------------------------------
Summary: StreamingQuery.processAllAvailable() blocks forever on queries containing mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())
Key: SPARK-38917
URL: https://issues.apache.org/jira/browse/SPARK-38917
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 3.1.2
Reporter: Trevor Christman
StreamingQuery.processAllAvailable() blocks forever when called on queries containing a mapGroupsWithState operation configured with GroupStateTimeout.ProcessingTimeTimeout().
I think processAllAvailable() should unblock when all incoming data has been processed AND when all existing groupStates do not have a current timeout specified.
Sample code to demonstrate this failure follows:
{code:java}
def demoSparkProcessAllAvailableBug() : Unit = {
val localSpark = SparkSession
.builder()
.master("local[*]")
.appName("demoSparkProcessAllAvailableBug")
.config("spark.driver.host", "localhost")
.getOrCreate()
import localSpark.implicits._
val demoDataStream = MemoryStream[BugDemo.NameNumberData](1, localSpark.sqlContext)
demoDataStream.addData(BugDemo.NameNumberData("Alice", 1))
demoDataStream.addData(BugDemo.NameNumberData("Bob", 2))
demoDataStream.addData(BugDemo.NameNumberData("Alice", 3))
demoDataStream.addData(BugDemo.NameNumberData("Bob", 4))
// StreamingQuery.processAllAvailable() is successful when executing against NoTimeout,
// but blocks forever when executing against EventTimeTimeout
val timeoutTypes = List(GroupStateTimeout.NoTimeout(), GroupStateTimeout.ProcessingTimeTimeout())
for (timeoutType <- timeoutTypes) {
val totalByName = demoDataStream.toDF()
.as[BugDemo.NameNumberData]
.groupByKey(_.Name)
.mapGroupsWithState(timeoutType)(BugDemo.summateRunningTotal)
val totalByNameQuery = totalByName
.writeStream
.format("console")
.outputMode("update")
.start()
println(s"${timeoutType} query starting to processAllAvailable()")
totalByNameQuery.processAllAvailable()
println(s"${timeoutType} query completed processAllAvailable()")
totalByNameQuery.stop()
}
}
}
object BugDemo {
def summateRunningTotal(name: String, input: Iterator[NameNumberData], groupState: GroupState[RunningTotal]): NameNumberData = {
var currentTotal: Int = if (groupState.exists) {
groupState.get.Total
} else {
0
}
for (nameNumberData <- input) {
currentTotal += nameNumberData.Number
}
groupState.update(RunningTotal(currentTotal))
NameNumberData(name, currentTotal)
}
case class NameNumberData(
Name: String,
Number: Integer
)
case class RunningTotal(
Total: Integer
)
} {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org