You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael (Jira)" <ji...@apache.org> on 2020/10/27 09:53:00 UTC
[jira] [Updated] (SPARK-33259) Joining 3 streams results in
incorrect output
[ https://issues.apache.org/jira/browse/SPARK-33259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael updated SPARK-33259:
----------------------------
Description:
I encountered an issue with Structured Streaming when doing a ((A LEFT JOIN B) JOIN C) operation. Below you can see example code I [posted on Stackoverflow|https://stackoverflow.com/questions/64503539/]...
I created a minimal example of "sessions", that have "start" and "end" events and optionally some "metadata".
The script generates two outputs: {{sessionStartsWithMetadata}} result from "start" events that are left-joined with the "metadata" events, based on {{sessionId}}. A "left join" is used, since we like to get an output event even when no corresponding metadata exists.
Additionally a DataFrame {{endedSessionsWithMetadata}} is created by joining "end" events to the previously created DataFrame. Here an "inner join" is used, since we only want some output when a session has ended for sure.
This code can be executed in {{spark-shell}}:
{code:scala}
import java.sql.Timestamp
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions.{col, expr, lit}
import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext
// Main data processing, regardless whether batch or stream processing
def process(
sessionStartEvents: DataFrame,
sessionOptionalMetadataEvents: DataFrame,
sessionEndEvents: DataFrame
): (DataFrame, DataFrame) = {
val sessionStartsWithMetadata: DataFrame = sessionStartEvents
.join(
sessionOptionalMetadataEvents,
sessionStartEvents("sessionId") === sessionOptionalMetadataEvents("sessionId") &&
sessionStartEvents("sessionStartTimestamp").between(
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL 1 seconds")),
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL 1 seconds"))
),
"left" // metadata is optional
)
.select(
sessionStartEvents("sessionId"),
sessionStartEvents("sessionStartTimestamp"),
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
)
val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
sessionEndEvents,
sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") &&
sessionStartsWithMetadata("sessionStartTimestamp").between(
sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 seconds")),
sessionEndEvents("sessionEndTimestamp")
)
)
(sessionStartsWithMetadata, endedSessionsWithMetadata)
}
def streamProcessing(
sessionStartData: Seq[(Timestamp, Int)],
sessionOptionalMetadata: Seq[(Timestamp, Int)],
sessionEndData: Seq[(Timestamp, Int)]
): (StreamingQuery, StreamingQuery) = {
val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionStartEventsStream.addData(sessionStartData)
val sessionStartEvents: DataFrame = sessionStartEventsStream
.toDS()
.toDF("sessionStartTimestamp", "sessionId")
.withWatermark("sessionStartTimestamp", "1 second")
val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)
val sessionOptionalMetadataEvents: DataFrame = sessionOptionalMetadataEventsStream
.toDS()
.toDF("sessionOptionalMetadataTimestamp", "sessionId")
.withWatermark("sessionOptionalMetadataTimestamp", "1 second")
val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionEndEventsStream.addData(sessionEndData)
val sessionEndEvents: DataFrame = sessionEndEventsStream
.toDS()
.toDF("sessionEndTimestamp", "sessionId")
.withWatermark("sessionEndTimestamp", "1 second")
val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
.select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see which query's output it is
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.option("numRows", "1000")
.start()
val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
.select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see which query's output it is
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.option("numRows", "1000")
.start()
(sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)
}
def batchProcessing(
sessionStartData: Seq[(Timestamp, Int)],
sessionOptionalMetadata: Seq[(Timestamp, Int)],
sessionEndData: Seq[(Timestamp, Int)]
): Unit = {
val sessionStartEvents = spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId")
val sessionOptionalMetadataEvents = spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", "sessionId")
val sessionEndEvents = spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")
val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
println("sessionStartsWithMetadata")
sessionStartsWithMetadata.show(100, truncate = false)
println("endedSessionsWithMetadata")
endedSessionsWithMetadata.show(100, truncate = false)
}
// Data is represented as tuples of (eventTime, sessionId)...
val sessionStartData = Vector(
(new Timestamp(1), 0),
(new Timestamp(2000), 1),
(new Timestamp(2000), 2),
(new Timestamp(20000), 10)
)
val sessionOptionalMetadata = Vector(
(new Timestamp(1), 0),
// session `1` has no metadata
(new Timestamp(2000), 2),
(new Timestamp(20000), 10)
)
val sessionEndData = Vector(
(new Timestamp(10000), 0),
(new Timestamp(11000), 1),
(new Timestamp(12000), 2),
(new Timestamp(30000), 10)
)
batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
{code}
In the example session with ID {{1}} has no metadata, so the respective metadata column is {{null}}.
The main functionality of joining the data is implemented in {{def process(…)}}, which is called using both batch data and stream data.
In the batch version the output is as expected:
{noformat}
sessionStartsWithMetadata
+---------+-----------------------+--------------------------------+
|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
+---------+-----------------------+--------------------------------+
|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
+---------+-----------------------+--------------------------------+
endedSessionsWithMetadata
+---------+-----------------------+--------------------------------+-------------------+---------+
|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+---------+-----------------------+--------------------------------+-------------------+---------+
|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
|1 |1970-01-01 01:00:02 |null |1970-01-01 01:00:11|1 | ← has no metadata ✔
|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
+---------+-----------------------+--------------------------------+-------------------+---------+
{noformat}
But when the same processing is run as stream processing the output of {{endedSessionsWithMetadata}} does not contain the entry of session {{1}} that has no metadata:
{noformat}
-------------------------------------------
Batch: 0 ("start event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
|sessionStartsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
|sessionStartsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
+-------------------------+---------+-----------------------+--------------------------------+
-------------------------------------------
Batch: 0 ("end event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
|endedSessionsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
|endedSessionsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
-------------------------------------------
Batch: 1 ("start event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
+-------------------------+---------+---------------------+--------------------------------+
-------------------------------------------
Batch: 1 ("end event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
↳ ✘ here I would have expected a line with sessionId=1, that has "start" and "end" information, but no "metadata" ✘
{noformat}
In a response it was suggested the issue looks related to [this mailing list post|http://apache-spark-developers-list.1001551.n3.nabble.com/correctness-issue-on-chained-streaming-streaming-join-td27358.html], but since I couldn't find a ticket here tracking the above mentioned issue, I'm creating this one.
was:
I encountered an issue with Structured Streaming when doing a ((A LEFT JOIN B) JOIN C) operation. Below you can see example code I [posted on Stackoverflow|https://stackoverflow.com/questions/64503539/]...
I created a minimal example of "sessions", that have "start" and "end" events and optionally some "metadata".
The script generates two outputs: `sessionStartsWithMetadata` result from "start" events that are left-joined with the "metadata" events, based on `sessionId`. A "left join" is used, since we like to get an output event even when no corresponding metadata exists.
Additionally a DataFrame `endedSessionsWithMetadata` is created by joining "end" events to the previously created DataFrame. Here an "inner join" is used, since we only want some output when a session has ended for sure.
This code can be executed in `spark-shell`:
{code:scala}
import java.sql.Timestamp
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions.{col, expr, lit}
import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext
// Main data processing, regardless whether batch or stream processing
def process(
sessionStartEvents: DataFrame,
sessionOptionalMetadataEvents: DataFrame,
sessionEndEvents: DataFrame
): (DataFrame, DataFrame) = {
val sessionStartsWithMetadata: DataFrame = sessionStartEvents
.join(
sessionOptionalMetadataEvents,
sessionStartEvents("sessionId") === sessionOptionalMetadataEvents("sessionId") &&
sessionStartEvents("sessionStartTimestamp").between(
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL 1 seconds")),
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL 1 seconds"))
),
"left" // metadata is optional
)
.select(
sessionStartEvents("sessionId"),
sessionStartEvents("sessionStartTimestamp"),
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
)
val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
sessionEndEvents,
sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") &&
sessionStartsWithMetadata("sessionStartTimestamp").between(
sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 seconds")),
sessionEndEvents("sessionEndTimestamp")
)
)
(sessionStartsWithMetadata, endedSessionsWithMetadata)
}
def streamProcessing(
sessionStartData: Seq[(Timestamp, Int)],
sessionOptionalMetadata: Seq[(Timestamp, Int)],
sessionEndData: Seq[(Timestamp, Int)]
): (StreamingQuery, StreamingQuery) = {
val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionStartEventsStream.addData(sessionStartData)
val sessionStartEvents: DataFrame = sessionStartEventsStream
.toDS()
.toDF("sessionStartTimestamp", "sessionId")
.withWatermark("sessionStartTimestamp", "1 second")
val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)
val sessionOptionalMetadataEvents: DataFrame = sessionOptionalMetadataEventsStream
.toDS()
.toDF("sessionOptionalMetadataTimestamp", "sessionId")
.withWatermark("sessionOptionalMetadataTimestamp", "1 second")
val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionEndEventsStream.addData(sessionEndData)
val sessionEndEvents: DataFrame = sessionEndEventsStream
.toDS()
.toDF("sessionEndTimestamp", "sessionId")
.withWatermark("sessionEndTimestamp", "1 second")
val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
.select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see which query's output it is
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.option("numRows", "1000")
.start()
val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
.select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see which query's output it is
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.option("numRows", "1000")
.start()
(sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)
}
def batchProcessing(
sessionStartData: Seq[(Timestamp, Int)],
sessionOptionalMetadata: Seq[(Timestamp, Int)],
sessionEndData: Seq[(Timestamp, Int)]
): Unit = {
val sessionStartEvents = spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId")
val sessionOptionalMetadataEvents = spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", "sessionId")
val sessionEndEvents = spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")
val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
println("sessionStartsWithMetadata")
sessionStartsWithMetadata.show(100, truncate = false)
println("endedSessionsWithMetadata")
endedSessionsWithMetadata.show(100, truncate = false)
}
// Data is represented as tuples of (eventTime, sessionId)...
val sessionStartData = Vector(
(new Timestamp(1), 0),
(new Timestamp(2000), 1),
(new Timestamp(2000), 2),
(new Timestamp(20000), 10)
)
val sessionOptionalMetadata = Vector(
(new Timestamp(1), 0),
// session `1` has no metadata
(new Timestamp(2000), 2),
(new Timestamp(20000), 10)
)
val sessionEndData = Vector(
(new Timestamp(10000), 0),
(new Timestamp(11000), 1),
(new Timestamp(12000), 2),
(new Timestamp(30000), 10)
)
batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
{code}
In the example session with ID `1` has no metadata, so the respective metadata column is `null`.
The main functionality of joining the data is implemented in `def process(…)`, which is called using both batch data and stream data.
In the batch version the output is as expected:
{noformat}
sessionStartsWithMetadata
+---------+-----------------------+--------------------------------+
|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
+---------+-----------------------+--------------------------------+
|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
+---------+-----------------------+--------------------------------+
endedSessionsWithMetadata
+---------+-----------------------+--------------------------------+-------------------+---------+
|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+---------+-----------------------+--------------------------------+-------------------+---------+
|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
|1 |1970-01-01 01:00:02 |null |1970-01-01 01:00:11|1 | ← has no metadata ✔
|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
+---------+-----------------------+--------------------------------+-------------------+---------+
{noformat}
But when the same processing is run as stream processing the output of `endedSessionsWithMetadata` does not contain the entry of session `1` that has no metadata:
{noformat}
-------------------------------------------
Batch: 0 ("start event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
|sessionStartsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
|sessionStartsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
+-------------------------+---------+-----------------------+--------------------------------+
-------------------------------------------
Batch: 0 ("end event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
|endedSessionsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
|endedSessionsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
-------------------------------------------
Batch: 1 ("start event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
+-------------------------+---------+---------------------+--------------------------------+
-------------------------------------------
Batch: 1 ("end event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
↳ ✘ here I would have expected a line with sessionId=1, that has "start" and "end" information, but no "metadata" ✘
{noformat}
In a response it was suggested the issue looks related to [this mailing list post|http://apache-spark-developers-list.1001551.n3.nabble.com/correctness-issue-on-chained-streaming-streaming-join-td27358.html], but since I couldn't find a ticket here tracking the above mentioned issue, I'm creating this one.
> Joining 3 streams results in incorrect output
> ---------------------------------------------
>
> Key: SPARK-33259
> URL: https://issues.apache.org/jira/browse/SPARK-33259
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.0.1
> Reporter: Michael
> Priority: Major
>
> I encountered an issue with Structured Streaming when doing a ((A LEFT JOIN B) JOIN C) operation. Below you can see example code I [posted on Stackoverflow|https://stackoverflow.com/questions/64503539/]...
> I created a minimal example of "sessions", that have "start" and "end" events and optionally some "metadata".
> The script generates two outputs: {{sessionStartsWithMetadata}} result from "start" events that are left-joined with the "metadata" events, based on {{sessionId}}. A "left join" is used, since we like to get an output event even when no corresponding metadata exists.
> Additionally a DataFrame {{endedSessionsWithMetadata}} is created by joining "end" events to the previously created DataFrame. Here an "inner join" is used, since we only want some output when a session has ended for sure.
> This code can be executed in {{spark-shell}}:
> {code:scala}
> import java.sql.Timestamp
> import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
> import org.apache.spark.sql.streaming.StreamingQuery
> import org.apache.spark.sql.{DataFrame, SQLContext}
> import org.apache.spark.sql.functions.{col, expr, lit}
> import spark.implicits._
> implicit val sqlContext: SQLContext = spark.sqlContext
> // Main data processing, regardless whether batch or stream processing
> def process(
> sessionStartEvents: DataFrame,
> sessionOptionalMetadataEvents: DataFrame,
> sessionEndEvents: DataFrame
> ): (DataFrame, DataFrame) = {
> val sessionStartsWithMetadata: DataFrame = sessionStartEvents
> .join(
> sessionOptionalMetadataEvents,
> sessionStartEvents("sessionId") === sessionOptionalMetadataEvents("sessionId") &&
> sessionStartEvents("sessionStartTimestamp").between(
> sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL 1 seconds")),
> sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL 1 seconds"))
> ),
> "left" // metadata is optional
> )
> .select(
> sessionStartEvents("sessionId"),
> sessionStartEvents("sessionStartTimestamp"),
> sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
> )
> val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
> sessionEndEvents,
> sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") &&
> sessionStartsWithMetadata("sessionStartTimestamp").between(
> sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 seconds")),
> sessionEndEvents("sessionEndTimestamp")
> )
> )
> (sessionStartsWithMetadata, endedSessionsWithMetadata)
> }
> def streamProcessing(
> sessionStartData: Seq[(Timestamp, Int)],
> sessionOptionalMetadata: Seq[(Timestamp, Int)],
> sessionEndData: Seq[(Timestamp, Int)]
> ): (StreamingQuery, StreamingQuery) = {
> val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
> sessionStartEventsStream.addData(sessionStartData)
> val sessionStartEvents: DataFrame = sessionStartEventsStream
> .toDS()
> .toDF("sessionStartTimestamp", "sessionId")
> .withWatermark("sessionStartTimestamp", "1 second")
> val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
> sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)
> val sessionOptionalMetadataEvents: DataFrame = sessionOptionalMetadataEventsStream
> .toDS()
> .toDF("sessionOptionalMetadataTimestamp", "sessionId")
> .withWatermark("sessionOptionalMetadataTimestamp", "1 second")
> val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
> sessionEndEventsStream.addData(sessionEndData)
> val sessionEndEvents: DataFrame = sessionEndEventsStream
> .toDS()
> .toDF("sessionEndTimestamp", "sessionId")
> .withWatermark("sessionEndTimestamp", "1 second")
> val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
> process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
> val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
> .select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see which query's output it is
> .writeStream
> .outputMode("append")
> .format("console")
> .option("truncate", "false")
> .option("numRows", "1000")
> .start()
> val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
> .select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see which query's output it is
> .writeStream
> .outputMode("append")
> .format("console")
> .option("truncate", "false")
> .option("numRows", "1000")
> .start()
> (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)
> }
> def batchProcessing(
> sessionStartData: Seq[(Timestamp, Int)],
> sessionOptionalMetadata: Seq[(Timestamp, Int)],
> sessionEndData: Seq[(Timestamp, Int)]
> ): Unit = {
> val sessionStartEvents = spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId")
> val sessionOptionalMetadataEvents = spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", "sessionId")
> val sessionEndEvents = spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")
> val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
> process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
> println("sessionStartsWithMetadata")
> sessionStartsWithMetadata.show(100, truncate = false)
> println("endedSessionsWithMetadata")
> endedSessionsWithMetadata.show(100, truncate = false)
> }
> // Data is represented as tuples of (eventTime, sessionId)...
> val sessionStartData = Vector(
> (new Timestamp(1), 0),
> (new Timestamp(2000), 1),
> (new Timestamp(2000), 2),
> (new Timestamp(20000), 10)
> )
> val sessionOptionalMetadata = Vector(
> (new Timestamp(1), 0),
> // session `1` has no metadata
> (new Timestamp(2000), 2),
> (new Timestamp(20000), 10)
> )
> val sessionEndData = Vector(
> (new Timestamp(10000), 0),
> (new Timestamp(11000), 1),
> (new Timestamp(12000), 2),
> (new Timestamp(30000), 10)
> )
> batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
> val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
> streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
> {code}
> In the example session with ID {{1}} has no metadata, so the respective metadata column is {{null}}.
> The main functionality of joining the data is implemented in {{def process(…)}}, which is called using both batch data and stream data.
> In the batch version the output is as expected:
> {noformat}
> sessionStartsWithMetadata
> +---------+-----------------------+--------------------------------+
> |sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
> +---------+-----------------------+--------------------------------+
> |0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
> |1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
> |2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
> |10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
> +---------+-----------------------+--------------------------------+
> endedSessionsWithMetadata
> +---------+-----------------------+--------------------------------+-------------------+---------+
> |sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
> +---------+-----------------------+--------------------------------+-------------------+---------+
> |0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
> |1 |1970-01-01 01:00:02 |null |1970-01-01 01:00:11|1 | ← has no metadata ✔
> |2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
> |10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
> +---------+-----------------------+--------------------------------+-------------------+---------+
> {noformat}
> But when the same processing is run as stream processing the output of {{endedSessionsWithMetadata}} does not contain the entry of session {{1}} that has no metadata:
> {noformat}
> -------------------------------------------
> Batch: 0 ("start event")
> -------------------------------------------
> +-------------------------+---------+-----------------------+--------------------------------+
> |sessionStartsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
> +-------------------------+---------+-----------------------+--------------------------------+
> |sessionStartsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
> |sessionStartsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
> |sessionStartsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
> +-------------------------+---------+-----------------------+--------------------------------+
> -------------------------------------------
> Batch: 0 ("end event")
> -------------------------------------------
> +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
> |endedSessionsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
> +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
> |endedSessionsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
> |endedSessionsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
> |endedSessionsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
> +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
> -------------------------------------------
> Batch: 1 ("start event")
> -------------------------------------------
> +-------------------------+---------+---------------------+--------------------------------+
> |sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
> +-------------------------+---------+---------------------+--------------------------------+
> |sessionStartsWithMetadata|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
> +-------------------------+---------+---------------------+--------------------------------+
> -------------------------------------------
> Batch: 1 ("end event")
> -------------------------------------------
> +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
> |endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
> +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
> +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
> ↳ ✘ here I would have expected a line with sessionId=1, that has "start" and "end" information, but no "metadata" ✘
> {noformat}
> In a response it was suggested the issue looks related to [this mailing list post|http://apache-spark-developers-list.1001551.n3.nabble.com/correctness-issue-on-chained-streaming-streaming-join-td27358.html], but since I couldn't find a ticket here tracking the above mentioned issue, I'm creating this one.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org