You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2019/01/24 09:07:00 UTC

[jira] [Assigned] (SPARK-26187) Stream-stream left outer join returns outer nulls for already matched rows

     [ https://issues.apache.org/jira/browse/SPARK-26187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-26187:
------------------------------------

    Assignee: Apache Spark

> Stream-stream left outer join returns outer nulls for already matched rows
> --------------------------------------------------------------------------
>
>                 Key: SPARK-26187
>                 URL: https://issues.apache.org/jira/browse/SPARK-26187
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.2
>            Reporter: Pavel Chernikov
>            Assignee: Apache Spark
>            Priority: Major
>
> This is basically the same issue as SPARK-26154, but with slightly easier reproducible and concrete example:
> {code:java}
> val rateStream = session.readStream
>  .format("rate")
>  .option("rowsPerSecond", 1)
>  .option("numPartitions", 1)
>  .load()
> import org.apache.spark.sql.functions._
> val fooStream = rateStream
>  .select(col("value").as("fooId"), col("timestamp").as("fooTime"))
> val barStream = rateStream
>  // Introduce misses for ease of debugging
>  .where(col("value") % 2 === 0)
>  .select(col("value").as("barId"), col("timestamp").as("barTime")){code}
> If barStream is configured to happen earlier than fooStream, based on time range condition, than everything is all right, no previously matched records are flushed with outer NULLs:
> {code:java}
> val query = fooStream
>  .withWatermark("fooTime", "5 seconds")
>  .join(
>    barStream.withWatermark("barTime", "5 seconds"),
>    expr("""
>      barId = fooId AND
>      fooTime >= barTime AND
>      fooTime <= barTime + interval 5 seconds
>         """),
>    joinType = "leftOuter"
>  )
>  .writeStream
>  .format("console")
>  .option("truncate", false)
>  .start(){code}
> It's easy to observe that only odd rows are flushed with NULLs on the right:
> {code:java}
> [info] Batch: 1 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |fooId|fooTime                |barId|barTime                | 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |0    |2018-11-27 13:12:34.976|0    |2018-11-27 13:12:34.976| 
> [info] |6    |2018-11-27 13:12:40.976|6    |2018-11-27 13:12:40.976| 
> [info] |10   |2018-11-27 13:12:44.976|10   |2018-11-27 13:12:44.976| 
> [info] |8    |2018-11-27 13:12:42.976|8    |2018-11-27 13:12:42.976| 
> [info] |2    |2018-11-27 13:12:36.976|2    |2018-11-27 13:12:36.976| 
> [info] |4    |2018-11-27 13:12:38.976|4    |2018-11-27 13:12:38.976| 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] Batch: 2 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |fooId|fooTime                |barId|barTime                | 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |1    |2018-11-27 13:12:35.976|null |null                   | 
> [info] |3    |2018-11-27 13:12:37.976|null |null                   | 
> [info] |12   |2018-11-27 13:12:46.976|12   |2018-11-27 13:12:46.976| 
> [info] |18   |2018-11-27 13:12:52.976|18   |2018-11-27 13:12:52.976| 
> [info] |14   |2018-11-27 13:12:48.976|14   |2018-11-27 13:12:48.976| 
> [info] |20   |2018-11-27 13:12:54.976|20   |2018-11-27 13:12:54.976| 
> [info] |16   |2018-11-27 13:12:50.976|16   |2018-11-27 13:12:50.976| 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] Batch: 3 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |fooId|fooTime                |barId|barTime                | 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |26   |2018-11-27 13:13:00.976|26   |2018-11-27 13:13:00.976| 
> [info] |22   |2018-11-27 13:12:56.976|22   |2018-11-27 13:12:56.976| 
> [info] |7    |2018-11-27 13:12:41.976|null |null                   | 
> [info] |9    |2018-11-27 13:12:43.976|null |null                   | 
> [info] |28   |2018-11-27 13:13:02.976|28   |2018-11-27 13:13:02.976| 
> [info] |5    |2018-11-27 13:12:39.976|null |null                   | 
> [info] |11   |2018-11-27 13:12:45.976|null |null                   | 
> [info] |13   |2018-11-27 13:12:47.976|null |null                   | 
> [info] |24   |2018-11-27 13:12:58.976|24   |2018-11-27 13:12:58.976| 
> [info] +-----+-----------------------+-----+-----------------------+
> {code}
> On the other hand, if we switch the ordering and now fooStream is happening earlier based on time range condition:
> {code:java}
> val query = fooStream
>  .withWatermark("fooTime", "5 seconds")
>  .join(
>    barStream.withWatermark("barTime", "5 seconds"),
>    expr("""
>      barId = fooId AND
>      barTime >= fooTime AND
>      barTime <= fooTime + interval 5 seconds
>         """),
>    joinType = "leftOuter"
>  )
>  .writeStream
>  .format("console")
>  .option("truncate", false)
>  .start(){code}
> Some, not all, previously matched records (with even IDs) are omitted with outer NULLs along with all unmatched records (with odd IDs): 
> {code:java}
> [info] Batch: 1 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |fooId|fooTime                |barId|barTime                | 
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |0    |2018-11-27 13:26:11.463|0    |2018-11-27 13:26:11.463| 
> [info] |6    |2018-11-27 13:26:17.463|6    |2018-11-27 13:26:17.463| 
> [info] |10   |2018-11-27 13:26:21.463|10   |2018-11-27 13:26:21.463| 
> [info] |8    |2018-11-27 13:26:19.463|8    |2018-11-27 13:26:19.463| 
> [info] |2    |2018-11-27 13:26:13.463|2    |2018-11-27 13:26:13.463| 
> [info] |4    |2018-11-27 13:26:15.463|4    |2018-11-27 13:26:15.463| 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] Batch: 2 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |fooId|fooTime                |barId|barTime                | 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |12   |2018-11-27 13:26:23.463|12   |2018-11-27 13:26:23.463| 
> [info] |18   |2018-11-27 13:26:29.463|18   |2018-11-27 13:26:29.463| 
> [info] |14   |2018-11-27 13:26:25.463|14   |2018-11-27 13:26:25.463| 
> [info] |20   |2018-11-27 13:26:31.463|20   |2018-11-27 13:26:31.463| 
> [info] |16   |2018-11-27 13:26:27.463|16   |2018-11-27 13:26:27.463| 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] Batch: 3
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |fooId|fooTime                |barId|barTime                | 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |26   |2018-11-27 13:26:37.463|26   |2018-11-27 13:26:37.463| 
> [info] |0    |2018-11-27 13:26:11.463|null |null                   | 
> [info] |22   |2018-11-27 13:26:33.463|22   |2018-11-27 13:26:33.463| 
> [info] |7    |2018-11-27 13:26:18.463|null |null                   | 
> [info] |9    |2018-11-27 13:26:20.463|null |null                   | 
> [info] |28   |2018-11-27 13:26:39.463|28   |2018-11-27 13:26:39.463| 
> [info] |5    |2018-11-27 13:26:16.463|null |null                   | 
> [info] |1    |2018-11-27 13:26:12.463|null |null                   | 
> [info] |3    |2018-11-27 13:26:14.463|null |null                   | 
> [info] |2    |2018-11-27 13:26:13.463|null |null                   | 
> [info] |4    |2018-11-27 13:26:15.463|null |null                   | 
> [info] |30   |2018-11-27 13:26:41.463|30   |2018-11-27 13:26:41.463| 
> [info] |24   |2018-11-27 13:26:35.463|24   |2018-11-27 13:26:35.463| 
> [info] +-----+-----------------------+-----+-----------------------+
> [info] Batch: 4 
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |fooId|fooTime                |barId|barTime                | 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |19   |2018-11-27 13:26:30.463|null |null                   | 
> [info] |34   |2018-11-27 13:26:45.463|34   |2018-11-27 13:26:45.463|
> [info] |32   |2018-11-27 13:26:43.463|32   |2018-11-27 13:26:43.463|
> [info] |17   |2018-11-27 13:26:28.463|null |null                   |
> [info] |10   |2018-11-27 13:26:21.463|null |null                   |
> [info] |12   |2018-11-27 13:26:23.463|null |null                   |
> [info] |11   |2018-11-27 13:26:22.463|null |null                   |
> [info] |13   |2018-11-27 13:26:24.463|null |null                   |
> [info] |36   |2018-11-27 13:26:47.463|36   |2018-11-27 13:26:47.463|
> [info] |14   |2018-11-27 13:26:25.463|null |null                   |
> [info] |15   |2018-11-27 13:26:26.463|null |null                   |
> [info] |38   |2018-11-27 13:26:49.463|38   |2018-11-27 13:26:49.463|
> [info] |40   |2018-11-27 13:26:51.463|40   |2018-11-27 13:26:51.463|
> [info] +-----+-----------------------+-----+-----------------------+
> {code}
> h6.  
> h6.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org