You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (Jira)" <ji...@apache.org> on 2020/10/27 13:06:00 UTC
[jira] [Commented] (SPARK-33259) Joining 3 streams results in
incorrect output
[ https://issues.apache.org/jira/browse/SPARK-33259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221409#comment-17221409 ]
Jungtaek Lim commented on SPARK-33259:
--------------------------------------
As you already figured out, this is a known limitation, and at least for now we ended up with documenting such limitation to compensate.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#limitation-of-global-watermark
This requires major change on the concept of watermark, so without huge demand on this it may be unlikely to be addressed.
> Joining 3 streams results in incorrect output
> ---------------------------------------------
>
> Key: SPARK-33259
> URL: https://issues.apache.org/jira/browse/SPARK-33259
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.0.1
> Reporter: Michael
> Priority: Major
>
> I encountered an issue with Structured Streaming when doing a ((A LEFT JOIN B) INNER JOIN C) operation. Below you can see example code I [posted on Stackoverflow|https://stackoverflow.com/questions/64503539/]...
> I created a minimal example of "sessions", that have "start" and "end" events and optionally some "metadata".
> The script generates two outputs: {{sessionStartsWithMetadata}} result from "start" events that are left-joined with the "metadata" events, based on {{sessionId}}. A "left join" is used, since we like to get an output event even when no corresponding metadata exists.
> Additionally a DataFrame {{endedSessionsWithMetadata}} is created by joining "end" events to the previously created DataFrame. Here an "inner join" is used, since we only want some output when a session has ended for sure.
> This code can be executed in {{spark-shell}}:
> {code:scala}
> import java.sql.Timestamp
> import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
> import org.apache.spark.sql.streaming.StreamingQuery
> import org.apache.spark.sql.{DataFrame, SQLContext}
> import org.apache.spark.sql.functions.{col, expr, lit}
> import spark.implicits._
> implicit val sqlContext: SQLContext = spark.sqlContext
> // Main data processing, regardless whether batch or stream processing
> def process(
> sessionStartEvents: DataFrame,
> sessionOptionalMetadataEvents: DataFrame,
> sessionEndEvents: DataFrame
> ): (DataFrame, DataFrame) = {
> val sessionStartsWithMetadata: DataFrame = sessionStartEvents
> .join(
> sessionOptionalMetadataEvents,
> sessionStartEvents("sessionId") === sessionOptionalMetadataEvents("sessionId") &&
> sessionStartEvents("sessionStartTimestamp").between(
> sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL 1 seconds")),
> sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL 1 seconds"))
> ),
> "left" // metadata is optional
> )
> .select(
> sessionStartEvents("sessionId"),
> sessionStartEvents("sessionStartTimestamp"),
> sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
> )
> val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
> sessionEndEvents,
> sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") &&
> sessionStartsWithMetadata("sessionStartTimestamp").between(
> sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 seconds")),
> sessionEndEvents("sessionEndTimestamp")
> )
> )
> (sessionStartsWithMetadata, endedSessionsWithMetadata)
> }
> def streamProcessing(
> sessionStartData: Seq[(Timestamp, Int)],
> sessionOptionalMetadata: Seq[(Timestamp, Int)],
> sessionEndData: Seq[(Timestamp, Int)]
> ): (StreamingQuery, StreamingQuery) = {
> val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
> sessionStartEventsStream.addData(sessionStartData)
> val sessionStartEvents: DataFrame = sessionStartEventsStream
> .toDS()
> .toDF("sessionStartTimestamp", "sessionId")
> .withWatermark("sessionStartTimestamp", "1 second")
> val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
> sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)
> val sessionOptionalMetadataEvents: DataFrame = sessionOptionalMetadataEventsStream
> .toDS()
> .toDF("sessionOptionalMetadataTimestamp", "sessionId")
> .withWatermark("sessionOptionalMetadataTimestamp", "1 second")
> val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
> sessionEndEventsStream.addData(sessionEndData)
> val sessionEndEvents: DataFrame = sessionEndEventsStream
> .toDS()
> .toDF("sessionEndTimestamp", "sessionId")
> .withWatermark("sessionEndTimestamp", "1 second")
> val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
> process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
> val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
> .select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see which query's output it is
> .writeStream
> .outputMode("append")
> .format("console")
> .option("truncate", "false")
> .option("numRows", "1000")
> .start()
> val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
> .select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see which query's output it is
> .writeStream
> .outputMode("append")
> .format("console")
> .option("truncate", "false")
> .option("numRows", "1000")
> .start()
> (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)
> }
> def batchProcessing(
> sessionStartData: Seq[(Timestamp, Int)],
> sessionOptionalMetadata: Seq[(Timestamp, Int)],
> sessionEndData: Seq[(Timestamp, Int)]
> ): Unit = {
> val sessionStartEvents = spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId")
> val sessionOptionalMetadataEvents = spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", "sessionId")
> val sessionEndEvents = spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")
> val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
> process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
> println("sessionStartsWithMetadata")
> sessionStartsWithMetadata.show(100, truncate = false)
> println("endedSessionsWithMetadata")
> endedSessionsWithMetadata.show(100, truncate = false)
> }
> // Data is represented as tuples of (eventTime, sessionId)...
> val sessionStartData = Vector(
> (new Timestamp(1), 0),
> (new Timestamp(2000), 1),
> (new Timestamp(2000), 2),
> (new Timestamp(20000), 10)
> )
> val sessionOptionalMetadata = Vector(
> (new Timestamp(1), 0),
> // session `1` has no metadata
> (new Timestamp(2000), 2),
> (new Timestamp(20000), 10)
> )
> val sessionEndData = Vector(
> (new Timestamp(10000), 0),
> (new Timestamp(11000), 1),
> (new Timestamp(12000), 2),
> (new Timestamp(30000), 10)
> )
> batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
> val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
> streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
> {code}
> In the example session with ID {{1}} has no metadata, so the respective metadata column is {{null}}.
> The main functionality of joining the data is implemented in {{def process(…)}}, which is called using both batch data and stream data.
> In the batch version the output is as expected:
> {noformat}
> sessionStartsWithMetadata
> +---------+-----------------------+--------------------------------+
> |sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
> +---------+-----------------------+--------------------------------+
> |0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
> |1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
> |2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
> |10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
> +---------+-----------------------+--------------------------------+
> endedSessionsWithMetadata
> +---------+-----------------------+--------------------------------+-------------------+---------+
> |sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
> +---------+-----------------------+--------------------------------+-------------------+---------+
> |0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
> |1 |1970-01-01 01:00:02 |null |1970-01-01 01:00:11|1 | ← has no metadata ✔
> |2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
> |10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
> +---------+-----------------------+--------------------------------+-------------------+---------+
> {noformat}
> But when the same processing is run as stream processing the output of {{endedSessionsWithMetadata}} does not contain the entry of session {{1}} that has no metadata:
> {noformat}
> -------------------------------------------
> Batch: 0 ("start event")
> -------------------------------------------
> +-------------------------+---------+-----------------------+--------------------------------+
> |sessionStartsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
> +-------------------------+---------+-----------------------+--------------------------------+
> |sessionStartsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
> |sessionStartsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
> |sessionStartsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
> +-------------------------+---------+-----------------------+--------------------------------+
> -------------------------------------------
> Batch: 0 ("end event")
> -------------------------------------------
> +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
> |endedSessionsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
> +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
> |endedSessionsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
> |endedSessionsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
> |endedSessionsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
> +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
> -------------------------------------------
> Batch: 1 ("start event")
> -------------------------------------------
> +-------------------------+---------+---------------------+--------------------------------+
> |sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
> +-------------------------+---------+---------------------+--------------------------------+
> |sessionStartsWithMetadata|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
> +-------------------------+---------+---------------------+--------------------------------+
> -------------------------------------------
> Batch: 1 ("end event")
> -------------------------------------------
> +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
> |endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
> +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
> +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
> ↳ ✘ here I would have expected a line with sessionId=1, that has "start" and "end" information, but no "metadata" ✘
> {noformat}
> In a response it was suggested the issue looks related to [~kabhwan]'s [mailing list post|http://apache-spark-developers-list.1001551.n3.nabble.com/correctness-issue-on-chained-streaming-streaming-join-td27358.html], but since I couldn't find a ticket here tracking the above mentioned issue, I'm creating this one.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org