You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (JIRA)" <ji...@apache.org> on 2019/04/11 08:45:00 UTC

[jira] [Commented] (SPARK-26154) Stream-stream joins - left outer join gives inconsistent output

    [ https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815211#comment-16815211 ] 

Jungtaek Lim commented on SPARK-26154:
--------------------------------------

This issue is reported again: SPARK-27433.

I'd say it should be a blocker for Spark 3.0.0. This slips two bugfix releases, and now major release is coming which I think it will slip again even for major release unless it is marked as a blocker.

Someone could say it's not a regression so it is not necessarily a blocker, but given we don't concern about non-blocker issues when releasing, only labelling as 'correctness' never works.

[http://spark.apache.org/contributing.html] page clearly describes that correctness issues should be considered blockers, and this issue has no workaround from Spark side which doesn't fit to mark as 'critical'.
 # Blocker: pointless to release without this change as the release would be unusable to a large minority of users. Correctness and data loss issues should be considered Blockers.
 # Critical: a large minority of users are missing important functionality without this, and/or a workaround is difficult

I'll mark this as a blocker: hope the patch will be reviewed soon.

> Stream-stream joins - left outer join gives inconsistent output
> ---------------------------------------------------------------
>
>                 Key: SPARK-26154
>                 URL: https://issues.apache.org/jira/browse/SPARK-26154
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.2
>         Environment: Spark version - Spark 2.3.2
> OS- Suse 11
>            Reporter: Haripriya
>            Priority: Critical
>              Labels: correctness
>
> Stream-stream joins using left outer join gives inconsistent  output 
> The data processed once, is being processed again and gives null value. In Batch 2, the input data  "3" is processed. But again in batch 6, null value is provided for same data
> Steps
> In spark-shell
> {code:java}
> scala>     import org.apache.spark.sql.functions.{col, expr}
> import org.apache.spark.sql.functions.{col, expr}
> scala>     import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.streaming.Trigger
> scala>     val lines_stream1 = spark.readStream.
>      |       format("kafka").
>      |       option("kafka.bootstrap.servers", "ip:9092").
>      |       option("subscribe", "topic1").
>      |       option("includeTimestamp", true).
>      |       load().
>      |       selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)].
>      |       select(col("value") as("data"),col("timestamp") as("recordTime")).
>      |       select("data","recordTime").
>      |       withWatermark("recordTime", "5 seconds ")
> lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data: string, recordTime: timestamp]
> scala>     val lines_stream2 = spark.readStream.
>      |       format("kafka").
>      |       option("kafka.bootstrap.servers", "ip:9092").
>      |       option("subscribe", "topic2").
>      |       option("includeTimestamp", value = true).
>      |       load().
>      |       selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)].
>      |       select(col("value") as("data1"),col("timestamp") as("recordTime1")).
>      |       select("data1","recordTime1").
>      |       withWatermark("recordTime1", "10 seconds ")
> lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data1: string, recordTime1: timestamp]
> scala>     val query = lines_stream1.join(lines_stream2, expr (
>      |       """
>      |         | data == data1 and
>      |         | recordTime1 >= recordTime and
>      |         | recordTime1 <= recordTime + interval 5 seconds
>      |       """.stripMargin),"left").
>      |       writeStream.
>      |       option("truncate","false").
>      |       outputMode("append").
>      |       format("console").option("checkpointLocation", "/tmp/leftouter/").
>      |       trigger(Trigger.ProcessingTime ("5 seconds")).
>      |       start()
> query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
> {code}
> Step2 : Start producing data
> kafka-console-producer.sh --broker-list ip:9092 --topic topic1
>  >1
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >bb
>  >cc
> kafka-console-producer.sh --broker-list ip:9092 --topic topic2
>  >2
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >cc
>  >ee
>  >ee
>  
> Output obtained:
> {code:java}
> Batch: 0
> -------------------------------------------
> +----+----------+-----+-----------+
> |data|recordTime|data1|recordTime1|
> +----+----------+-----+-----------+
> +----+----------+-----+-----------+
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +----+----------+-----+-----------+
> |data|recordTime|data1|recordTime1|
> +----+----------+-----+-----------+
> +----+----------+-----+-----------+
> -------------------------------------------
> Batch: 2
> -------------------------------------------
> +----+-----------------------+-----+-----------------------+
> |data|recordTime             |data1|recordTime1            |
> +----+-----------------------+-----+-----------------------+
> |3   |2018-11-22 20:09:35.053|3    |2018-11-22 20:09:36.506|
> |2   |2018-11-22 20:09:31.613|2    |2018-11-22 20:09:33.116|
> +----+-----------------------+-----+-----------------------+
> -------------------------------------------
> Batch: 3
> -------------------------------------------
> +----+-----------------------+-----+-----------------------+
> |data|recordTime             |data1|recordTime1            |
> +----+-----------------------+-----+-----------------------+
> |4   |2018-11-22 20:09:38.654|4    |2018-11-22 20:09:39.818|
> +----+-----------------------+-----+-----------------------+
> -------------------------------------------
> Batch: 4
> -------------------------------------------
> +----+-----------------------+-----+-----------------------+
> |data|recordTime             |data1|recordTime1            |
> +----+-----------------------+-----+-----------------------+
> |5   |2018-11-22 20:09:44.809|5    |2018-11-22 20:09:47.452|
> |1   |2018-11-22 20:09:22.662|null |null                   |
> +----+-----------------------+-----+-----------------------+
> -------------------------------------------
> Batch: 5
> -------------------------------------------
> +----+-----------------------+-----+-----------------------+
> |data|recordTime             |data1|recordTime1            |
> +----+-----------------------+-----+-----------------------+
> |cc  |2018-11-22 20:10:06.654|cc   |2018-11-22 20:10:08.701|
> |aa  |2018-11-22 20:10:01.536|aa   |2018-11-22 20:10:03.259|
> +----+-----------------------+-----+-----------------------+
> -------------------------------------------
> Batch: 6
> -------------------------------------------
> +----+-----------------------+-----+-----------+
> |data|recordTime             |data1|recordTime1|
> +----+-----------------------+-----+-----------+
> |3   |2018-11-22 20:09:35.053|null |null       |
> +----+-----------------------+-----+-----------+
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org