You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (Jira)" <ji...@apache.org> on 2020/01/22 23:54:00 UTC

[jira] [Commented] (SPARK-26154) Stream-stream joins - left outer join gives inconsistent output

    [ https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021627#comment-17021627 ] 

Jungtaek Lim commented on SPARK-26154:
--------------------------------------

Leaving information why this issue cannot be easily ported back to 2.x version line although it fixes the correctness.

 

We fixed the issue with "destructive way" on existing query as the state in existing query cannot be corrected. In 3.0 migration guide we added below content:
{quote}Spark 3.0 fixes the correctness issue on Stream-stream outer join, which changes the schema of state. (SPARK-26154 for more details) Spark 3.0 will fail the query if you start your query from checkpoint constructed from Spark 2.x which uses stream-stream outer join. Please discard the checkpoint and replay previous inputs to recalculate outputs.
{quote}
End users might not be mad if "major" version requires them to lose something. They still have Spark 2.x version line to deny the change and take the risk (as the issue is occurred from "edge-case"). If we put this to 2.x version line they may have no way to deny.

Please note that unlike other states having versions which can be co-existed, for stream-stream outer join, there're only "valid" (state format version = 2) and "invalid" (state format version = 1) state format which cannot be co-existed.

We cannot still simply drop state format version 1, because stream-stream inner join is not affected by this bug and we don't want to let the case also discard the state. It will affect too many existing queries and I'd like to reduce the impact.

> Stream-stream joins - left outer join gives inconsistent output
> ---------------------------------------------------------------
>
>                 Key: SPARK-26154
>                 URL: https://issues.apache.org/jira/browse/SPARK-26154
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.2, 3.0.0
>         Environment: Spark version - Spark 2.3.2
> OS- Suse 11
>            Reporter: Haripriya
>            Assignee: Jungtaek Lim
>            Priority: Blocker
>              Labels: correctness
>             Fix For: 3.0.0
>
>
> Stream-stream joins using left outer join gives inconsistent  output 
> The data processed once, is being processed again and gives null value. In Batch 2, the input data  "3" is processed. But again in batch 6, null value is provided for same data
> Steps
> In spark-shell
> {code:java}
> scala>     import org.apache.spark.sql.functions.{col, expr}
> import org.apache.spark.sql.functions.{col, expr}
> scala>     import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.streaming.Trigger
> scala>     val lines_stream1 = spark.readStream.
>      |       format("kafka").
>      |       option("kafka.bootstrap.servers", "ip:9092").
>      |       option("subscribe", "topic1").
>      |       option("includeTimestamp", true).
>      |       load().
>      |       selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)].
>      |       select(col("value") as("data"),col("timestamp") as("recordTime")).
>      |       select("data","recordTime").
>      |       withWatermark("recordTime", "5 seconds ")
> lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data: string, recordTime: timestamp]
> scala>     val lines_stream2 = spark.readStream.
>      |       format("kafka").
>      |       option("kafka.bootstrap.servers", "ip:9092").
>      |       option("subscribe", "topic2").
>      |       option("includeTimestamp", value = true).
>      |       load().
>      |       selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)].
>      |       select(col("value") as("data1"),col("timestamp") as("recordTime1")).
>      |       select("data1","recordTime1").
>      |       withWatermark("recordTime1", "10 seconds ")
> lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data1: string, recordTime1: timestamp]
> scala>     val query = lines_stream1.join(lines_stream2, expr (
>      |       """
>      |         | data == data1 and
>      |         | recordTime1 >= recordTime and
>      |         | recordTime1 <= recordTime + interval 5 seconds
>      |       """.stripMargin),"left").
>      |       writeStream.
>      |       option("truncate","false").
>      |       outputMode("append").
>      |       format("console").option("checkpointLocation", "/tmp/leftouter/").
>      |       trigger(Trigger.ProcessingTime ("5 seconds")).
>      |       start()
> query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
> {code}
> Step2 : Start producing data
> kafka-console-producer.sh --broker-list ip:9092 --topic topic1
>  >1
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >bb
>  >cc
> kafka-console-producer.sh --broker-list ip:9092 --topic topic2
>  >2
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >cc
>  >ee
>  >ee
>  
> Output obtained:
> {code:java}
> Batch: 0
> -------------------------------------------
> +----+----------+-----+-----------+
> |data|recordTime|data1|recordTime1|
> +----+----------+-----+-----------+
> +----+----------+-----+-----------+
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +----+----------+-----+-----------+
> |data|recordTime|data1|recordTime1|
> +----+----------+-----+-----------+
> +----+----------+-----+-----------+
> -------------------------------------------
> Batch: 2
> -------------------------------------------
> +----+-----------------------+-----+-----------------------+
> |data|recordTime             |data1|recordTime1            |
> +----+-----------------------+-----+-----------------------+
> |3   |2018-11-22 20:09:35.053|3    |2018-11-22 20:09:36.506|
> |2   |2018-11-22 20:09:31.613|2    |2018-11-22 20:09:33.116|
> +----+-----------------------+-----+-----------------------+
> -------------------------------------------
> Batch: 3
> -------------------------------------------
> +----+-----------------------+-----+-----------------------+
> |data|recordTime             |data1|recordTime1            |
> +----+-----------------------+-----+-----------------------+
> |4   |2018-11-22 20:09:38.654|4    |2018-11-22 20:09:39.818|
> +----+-----------------------+-----+-----------------------+
> -------------------------------------------
> Batch: 4
> -------------------------------------------
> +----+-----------------------+-----+-----------------------+
> |data|recordTime             |data1|recordTime1            |
> +----+-----------------------+-----+-----------------------+
> |5   |2018-11-22 20:09:44.809|5    |2018-11-22 20:09:47.452|
> |1   |2018-11-22 20:09:22.662|null |null                   |
> +----+-----------------------+-----+-----------------------+
> -------------------------------------------
> Batch: 5
> -------------------------------------------
> +----+-----------------------+-----+-----------------------+
> |data|recordTime             |data1|recordTime1            |
> +----+-----------------------+-----+-----------------------+
> |cc  |2018-11-22 20:10:06.654|cc   |2018-11-22 20:10:08.701|
> |aa  |2018-11-22 20:10:01.536|aa   |2018-11-22 20:10:03.259|
> +----+-----------------------+-----+-----------------------+
> -------------------------------------------
> Batch: 6
> -------------------------------------------
> +----+-----------------------+-----+-----------+
> |data|recordTime             |data1|recordTime1|
> +----+-----------------------+-----+-----------+
> |3   |2018-11-22 20:09:35.053|null |null       |
> +----+-----------------------+-----+-----------+
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org