You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Pavel Chernikov (JIRA)" <ji...@apache.org> on 2018/11/27 18:36:00 UTC

[jira] [Created] (SPARK-26187) Stream-stream left outer join returns outer nulls for already matched rows

Pavel Chernikov created SPARK-26187:
---------------------------------------

             Summary: Stream-stream left outer join returns outer nulls for already matched rows
                 Key: SPARK-26187
                 URL: https://issues.apache.org/jira/browse/SPARK-26187
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.3.2
            Reporter: Pavel Chernikov


This is basically the same issue as [SPARK-26154|https://issues.apache.org/jira/browse/SPARK-26154], but with slightly easier reproducible and concrete example:



 
{code:java}
val rateStream = session.readStream
 .format("rate")
 .option("rowsPerSecond", 1)
 .option("numPartitions", 1)
 .load()

import org.apache.spark.sql.functions._

val fooStream = rateStream
 .select(col("value").as("fooId"), col("timestamp").as("fooTime"))

val barStream = rateStream
 // Introduce misses for ease of debugging
 .where(col("value") % 2 === 0)
 .select(col("value").as("barId"), col("timestamp").as("barTime")){code}
If barStream is configured to happen earlier than fooStream, based on time range condition, than everything is all right, no previously matched records are flushed with outer NULLs:

 
{code:java}
val query = fooStream
 .withWatermark("fooTime", "5 seconds")
 .join(
   barStream.withWatermark("barTime", "5 seconds"),
   expr("""
     barId = fooId AND
     fooTime >= barTime AND
     fooTime <= barTime + interval 5 seconds
        """),
   joinType = "leftOuter"
 )
 .writeStream
 .format("console")
 .option("truncate", false)
 .start(){code}
It's easy to observe that only odd rows are flushed with NULLs on the right:
h6.  
{code:java}
[info] Batch: 1 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] |fooId|fooTime                |barId|barTime                | 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] |0    |2018-11-27 13:12:34.976|0    |2018-11-27 13:12:34.976| 
[info] |6    |2018-11-27 13:12:40.976|6    |2018-11-27 13:12:40.976| 
[info] |10   |2018-11-27 13:12:44.976|10   |2018-11-27 13:12:44.976| 
[info] |8    |2018-11-27 13:12:42.976|8    |2018-11-27 13:12:42.976| 
[info] |2    |2018-11-27 13:12:36.976|2    |2018-11-27 13:12:36.976| 
[info] |4    |2018-11-27 13:12:38.976|4    |2018-11-27 13:12:38.976| 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] Batch: 2 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] |fooId|fooTime                |barId|barTime                | 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] |1    |2018-11-27 13:12:35.976|null |null                   | 
[info] |3    |2018-11-27 13:12:37.976|null |null                   | 
[info] |12   |2018-11-27 13:12:46.976|12   |2018-11-27 13:12:46.976| 
[info] |18   |2018-11-27 13:12:52.976|18   |2018-11-27 13:12:52.976| 
[info] |14   |2018-11-27 13:12:48.976|14   |2018-11-27 13:12:48.976| 
[info] |20   |2018-11-27 13:12:54.976|20   |2018-11-27 13:12:54.976| 
[info] |16   |2018-11-27 13:12:50.976|16   |2018-11-27 13:12:50.976| 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] Batch: 3 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] |fooId|fooTime                |barId|barTime                | 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] |26   |2018-11-27 13:13:00.976|26   |2018-11-27 13:13:00.976| 
[info] |22   |2018-11-27 13:12:56.976|22   |2018-11-27 13:12:56.976| 
[info] |7    |2018-11-27 13:12:41.976|null |null                   | 
[info] |9    |2018-11-27 13:12:43.976|null |null                   | 
[info] |28   |2018-11-27 13:13:02.976|28   |2018-11-27 13:13:02.976| 
[info] |5    |2018-11-27 13:12:39.976|null |null                   | 
[info] |11   |2018-11-27 13:12:45.976|null |null                   | 
[info] |13   |2018-11-27 13:12:47.976|null |null                   | 
[info] |24   |2018-11-27 13:12:58.976|24   |2018-11-27 13:12:58.976| 
[info] +-----+-----------------------+-----+-----------------------+
{code}
On the other hand, if we switch the ordering and now fooStream is happening earlier based on time range condition:

 
{code:java}
val query = fooStream
 .withWatermark("fooTime", "5 seconds")
 .join(
   barStream.withWatermark("barTime", "5 seconds"),
   expr("""
     barId = fooId AND
     barTime >= fooTime AND
     barTime <= fooTime + interval 5 seconds
        """),
   joinType = "leftOuter"
 )
 .writeStream
 .format("console")
 .option("truncate", false)
 .start(){code}
Some, not all, previously matched records (with even IDs) are omitted with outer NULLs along with all unmatched records (with odd IDs):

 
h6.  
{code:java}
[info] Batch: 1 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] |fooId|fooTime                |barId|barTime                | 
[info] +-----+-----------------------+-----+-----------------------+
[info] |0    |2018-11-27 13:26:11.463|0    |2018-11-27 13:26:11.463| 
[info] |6    |2018-11-27 13:26:17.463|6    |2018-11-27 13:26:17.463| 
[info] |10   |2018-11-27 13:26:21.463|10   |2018-11-27 13:26:21.463| 
[info] |8    |2018-11-27 13:26:19.463|8    |2018-11-27 13:26:19.463| 
[info] |2    |2018-11-27 13:26:13.463|2    |2018-11-27 13:26:13.463| 
[info] |4    |2018-11-27 13:26:15.463|4    |2018-11-27 13:26:15.463| 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] Batch: 2 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] |fooId|fooTime                |barId|barTime                | 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] |12   |2018-11-27 13:26:23.463|12   |2018-11-27 13:26:23.463| 
[info] |18   |2018-11-27 13:26:29.463|18   |2018-11-27 13:26:29.463| 
[info] |14   |2018-11-27 13:26:25.463|14   |2018-11-27 13:26:25.463| 
[info] |20   |2018-11-27 13:26:31.463|20   |2018-11-27 13:26:31.463| 
[info] |16   |2018-11-27 13:26:27.463|16   |2018-11-27 13:26:27.463| 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] Batch: 3
[info] +-----+-----------------------+-----+-----------------------+ 
[info] |fooId|fooTime                |barId|barTime                | 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] |26   |2018-11-27 13:26:37.463|26   |2018-11-27 13:26:37.463| 
[info] |0    |2018-11-27 13:26:11.463|null |null                   | 
[info] |22   |2018-11-27 13:26:33.463|22   |2018-11-27 13:26:33.463| 
[info] |7    |2018-11-27 13:26:18.463|null |null                   | 
[info] |9    |2018-11-27 13:26:20.463|null |null                   | 
[info] |28   |2018-11-27 13:26:39.463|28   |2018-11-27 13:26:39.463| 
[info] |5    |2018-11-27 13:26:16.463|null |null                   | 
[info] |1    |2018-11-27 13:26:12.463|null |null                   | 
[info] |3    |2018-11-27 13:26:14.463|null |null                   | 
[info] |2    |2018-11-27 13:26:13.463|null |null                   | 
[info] |4    |2018-11-27 13:26:15.463|null |null                   | 
[info] |30   |2018-11-27 13:26:41.463|30   |2018-11-27 13:26:41.463| 
[info] |24   |2018-11-27 13:26:35.463|24   |2018-11-27 13:26:35.463| 
[info] +-----+-----------------------+-----+-----------------------+
[info] Batch: 4 
[info] +-----+-----------------------+-----+-----------------------+
[info] |fooId|fooTime                |barId|barTime                | 
[info] +-----+-----------------------+-----+-----------------------+ 
[info] |19   |2018-11-27 13:26:30.463|null |null                   | 
[info] |34   |2018-11-27 13:26:45.463|34   |2018-11-27 13:26:45.463|
[info] |32   |2018-11-27 13:26:43.463|32   |2018-11-27 13:26:43.463|
[info] |17   |2018-11-27 13:26:28.463|null |null                   |
[info] |10   |2018-11-27 13:26:21.463|null |null                   |
[info] |12   |2018-11-27 13:26:23.463|null |null                   |
[info] |11   |2018-11-27 13:26:22.463|null |null                   |
[info] |13   |2018-11-27 13:26:24.463|null |null                   |
[info] |36   |2018-11-27 13:26:47.463|36   |2018-11-27 13:26:47.463|
[info] |14   |2018-11-27 13:26:25.463|null |null                   |
[info] |15   |2018-11-27 13:26:26.463|null |null                   |
[info] |38   |2018-11-27 13:26:49.463|38   |2018-11-27 13:26:49.463|
[info] |40   |2018-11-27 13:26:51.463|40   |2018-11-27 13:26:51.463|
[info] +-----+-----------------------+-----+-----------------------+
{code}
h6.  
h6.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org