You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Trevor Christman (Jira)" <ji...@apache.org> on 2022/04/15 18:39:00 UTC

[jira] [Created] (SPARK-38917) StreamingQuery.processAllAvailable() blocks forever on queries containing mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())

Trevor Christman created SPARK-38917:
----------------------------------------

             Summary: StreamingQuery.processAllAvailable() blocks forever on queries containing mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())
                 Key: SPARK-38917
                 URL: https://issues.apache.org/jira/browse/SPARK-38917
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 3.1.2
            Reporter: Trevor Christman


StreamingQuery.processAllAvailable() blocks forever when called on queries containing a mapGroupsWithState operation configured with GroupStateTimeout.ProcessingTimeTimeout().

 

I think processAllAvailable() should unblock when all incoming data has been processed AND when all existing groupStates do not have a current timeout specified.

 

Sample code to demonstrate this failure follows:
{code:java}
def demoSparkProcessAllAvailableBug() : Unit = {
    val localSpark = SparkSession
      .builder()
      .master("local[*]")
      .appName("demoSparkProcessAllAvailableBug")
      .config("spark.driver.host", "localhost")
      .getOrCreate()

    import localSpark.implicits._

    val demoDataStream = MemoryStream[BugDemo.NameNumberData](1, localSpark.sqlContext)
    demoDataStream.addData(BugDemo.NameNumberData("Alice", 1))
    demoDataStream.addData(BugDemo.NameNumberData("Bob", 2))
    demoDataStream.addData(BugDemo.NameNumberData("Alice", 3))
    demoDataStream.addData(BugDemo.NameNumberData("Bob", 4))

    // StreamingQuery.processAllAvailable() is successful when executing against NoTimeout,
    // but blocks forever when executing against EventTimeTimeout
    val timeoutTypes = List(GroupStateTimeout.NoTimeout(), GroupStateTimeout.ProcessingTimeTimeout())

    for (timeoutType <- timeoutTypes) {
      val totalByName = demoDataStream.toDF()
        .as[BugDemo.NameNumberData]
        .groupByKey(_.Name)
        .mapGroupsWithState(timeoutType)(BugDemo.summateRunningTotal)

      val totalByNameQuery = totalByName
        .writeStream
        .format("console")
        .outputMode("update")
        .start()

      println(s"${timeoutType} query starting to processAllAvailable()")
      totalByNameQuery.processAllAvailable()
      println(s"${timeoutType} query completed processAllAvailable()")

      totalByNameQuery.stop()
    }
  }
}

object BugDemo {
  def summateRunningTotal(name: String, input: Iterator[NameNumberData], groupState: GroupState[RunningTotal]): NameNumberData = {
    var currentTotal: Int = if (groupState.exists) {
      groupState.get.Total
    } else {
      0
    }

    for (nameNumberData <- input) {
      currentTotal += nameNumberData.Number
    }

    groupState.update(RunningTotal(currentTotal))

    NameNumberData(name, currentTotal)
  }

  case class NameNumberData(
    Name: String,
    Number: Integer
  )

  case class RunningTotal(
    Total: Integer
  )
} {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org