You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (JIRA)" <ji...@apache.org> on 2019/01/25 00:48:00 UTC
[jira] [Resolved] (SPARK-26187) Stream-stream left outer join
returns outer nulls for already matched rows
[ https://issues.apache.org/jira/browse/SPARK-26187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim resolved SPARK-26187.
----------------------------------
Resolution: Duplicate
> Stream-stream left outer join returns outer nulls for already matched rows
> --------------------------------------------------------------------------
>
> Key: SPARK-26187
> URL: https://issues.apache.org/jira/browse/SPARK-26187
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.3.2
> Reporter: Pavel Chernikov
> Priority: Major
>
> This is basically the same issue as SPARK-26154, but with slightly easier reproducible and concrete example:
> {code:java}
> val rateStream = session.readStream
> .format("rate")
> .option("rowsPerSecond", 1)
> .option("numPartitions", 1)
> .load()
> import org.apache.spark.sql.functions._
> val fooStream = rateStream
> .select(col("value").as("fooId"), col("timestamp").as("fooTime"))
> val barStream = rateStream
> // Introduce misses for ease of debugging
> .where(col("value") % 2 === 0)
> .select(col("value").as("barId"), col("timestamp").as("barTime")){code}
> If barStream is configured to happen earlier than fooStream, based on time range condition, than everything is all right, no previously matched records are flushed with outer NULLs:
> {code:java}
> val query = fooStream
> .withWatermark("fooTime", "5 seconds")
> .join(
> barStream.withWatermark("barTime", "5 seconds"),
> expr("""
> barId = fooId AND
> fooTime >= barTime AND
> fooTime <= barTime + interval 5 seconds
> """),
> joinType = "leftOuter"
> )
> .writeStream
> .format("console")
> .option("truncate", false)
> .start(){code}
> It's easy to observe that only odd rows are flushed with NULLs on the right:
> {code:java}
> [info] Batch: 1
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |fooId|fooTime |barId|barTime |
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |0 |2018-11-27 13:12:34.976|0 |2018-11-27 13:12:34.976|
> [info] |6 |2018-11-27 13:12:40.976|6 |2018-11-27 13:12:40.976|
> [info] |10 |2018-11-27 13:12:44.976|10 |2018-11-27 13:12:44.976|
> [info] |8 |2018-11-27 13:12:42.976|8 |2018-11-27 13:12:42.976|
> [info] |2 |2018-11-27 13:12:36.976|2 |2018-11-27 13:12:36.976|
> [info] |4 |2018-11-27 13:12:38.976|4 |2018-11-27 13:12:38.976|
> [info] +-----+-----------------------+-----+-----------------------+
> [info] Batch: 2
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |fooId|fooTime |barId|barTime |
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |1 |2018-11-27 13:12:35.976|null |null |
> [info] |3 |2018-11-27 13:12:37.976|null |null |
> [info] |12 |2018-11-27 13:12:46.976|12 |2018-11-27 13:12:46.976|
> [info] |18 |2018-11-27 13:12:52.976|18 |2018-11-27 13:12:52.976|
> [info] |14 |2018-11-27 13:12:48.976|14 |2018-11-27 13:12:48.976|
> [info] |20 |2018-11-27 13:12:54.976|20 |2018-11-27 13:12:54.976|
> [info] |16 |2018-11-27 13:12:50.976|16 |2018-11-27 13:12:50.976|
> [info] +-----+-----------------------+-----+-----------------------+
> [info] Batch: 3
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |fooId|fooTime |barId|barTime |
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |26 |2018-11-27 13:13:00.976|26 |2018-11-27 13:13:00.976|
> [info] |22 |2018-11-27 13:12:56.976|22 |2018-11-27 13:12:56.976|
> [info] |7 |2018-11-27 13:12:41.976|null |null |
> [info] |9 |2018-11-27 13:12:43.976|null |null |
> [info] |28 |2018-11-27 13:13:02.976|28 |2018-11-27 13:13:02.976|
> [info] |5 |2018-11-27 13:12:39.976|null |null |
> [info] |11 |2018-11-27 13:12:45.976|null |null |
> [info] |13 |2018-11-27 13:12:47.976|null |null |
> [info] |24 |2018-11-27 13:12:58.976|24 |2018-11-27 13:12:58.976|
> [info] +-----+-----------------------+-----+-----------------------+
> {code}
> On the other hand, if we switch the ordering and now fooStream is happening earlier based on time range condition:
> {code:java}
> val query = fooStream
> .withWatermark("fooTime", "5 seconds")
> .join(
> barStream.withWatermark("barTime", "5 seconds"),
> expr("""
> barId = fooId AND
> barTime >= fooTime AND
> barTime <= fooTime + interval 5 seconds
> """),
> joinType = "leftOuter"
> )
> .writeStream
> .format("console")
> .option("truncate", false)
> .start(){code}
> Some, not all, previously matched records (with even IDs) are omitted with outer NULLs along with all unmatched records (with odd IDs):
> {code:java}
> [info] Batch: 1
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |fooId|fooTime |barId|barTime |
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |0 |2018-11-27 13:26:11.463|0 |2018-11-27 13:26:11.463|
> [info] |6 |2018-11-27 13:26:17.463|6 |2018-11-27 13:26:17.463|
> [info] |10 |2018-11-27 13:26:21.463|10 |2018-11-27 13:26:21.463|
> [info] |8 |2018-11-27 13:26:19.463|8 |2018-11-27 13:26:19.463|
> [info] |2 |2018-11-27 13:26:13.463|2 |2018-11-27 13:26:13.463|
> [info] |4 |2018-11-27 13:26:15.463|4 |2018-11-27 13:26:15.463|
> [info] +-----+-----------------------+-----+-----------------------+
> [info] Batch: 2
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |fooId|fooTime |barId|barTime |
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |12 |2018-11-27 13:26:23.463|12 |2018-11-27 13:26:23.463|
> [info] |18 |2018-11-27 13:26:29.463|18 |2018-11-27 13:26:29.463|
> [info] |14 |2018-11-27 13:26:25.463|14 |2018-11-27 13:26:25.463|
> [info] |20 |2018-11-27 13:26:31.463|20 |2018-11-27 13:26:31.463|
> [info] |16 |2018-11-27 13:26:27.463|16 |2018-11-27 13:26:27.463|
> [info] +-----+-----------------------+-----+-----------------------+
> [info] Batch: 3
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |fooId|fooTime |barId|barTime |
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |26 |2018-11-27 13:26:37.463|26 |2018-11-27 13:26:37.463|
> [info] |0 |2018-11-27 13:26:11.463|null |null |
> [info] |22 |2018-11-27 13:26:33.463|22 |2018-11-27 13:26:33.463|
> [info] |7 |2018-11-27 13:26:18.463|null |null |
> [info] |9 |2018-11-27 13:26:20.463|null |null |
> [info] |28 |2018-11-27 13:26:39.463|28 |2018-11-27 13:26:39.463|
> [info] |5 |2018-11-27 13:26:16.463|null |null |
> [info] |1 |2018-11-27 13:26:12.463|null |null |
> [info] |3 |2018-11-27 13:26:14.463|null |null |
> [info] |2 |2018-11-27 13:26:13.463|null |null |
> [info] |4 |2018-11-27 13:26:15.463|null |null |
> [info] |30 |2018-11-27 13:26:41.463|30 |2018-11-27 13:26:41.463|
> [info] |24 |2018-11-27 13:26:35.463|24 |2018-11-27 13:26:35.463|
> [info] +-----+-----------------------+-----+-----------------------+
> [info] Batch: 4
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |fooId|fooTime |barId|barTime |
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |19 |2018-11-27 13:26:30.463|null |null |
> [info] |34 |2018-11-27 13:26:45.463|34 |2018-11-27 13:26:45.463|
> [info] |32 |2018-11-27 13:26:43.463|32 |2018-11-27 13:26:43.463|
> [info] |17 |2018-11-27 13:26:28.463|null |null |
> [info] |10 |2018-11-27 13:26:21.463|null |null |
> [info] |12 |2018-11-27 13:26:23.463|null |null |
> [info] |11 |2018-11-27 13:26:22.463|null |null |
> [info] |13 |2018-11-27 13:26:24.463|null |null |
> [info] |36 |2018-11-27 13:26:47.463|36 |2018-11-27 13:26:47.463|
> [info] |14 |2018-11-27 13:26:25.463|null |null |
> [info] |15 |2018-11-27 13:26:26.463|null |null |
> [info] |38 |2018-11-27 13:26:49.463|38 |2018-11-27 13:26:49.463|
> [info] |40 |2018-11-27 13:26:51.463|40 |2018-11-27 13:26:51.463|
> [info] +-----+-----------------------+-----+-----------------------+
> {code}
> h6.
> h6.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org