You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Pavel Chernikov (JIRA)" <ji...@apache.org> on 2018/11/27 18:36:00 UTC
[jira] [Created] (SPARK-26187) Stream-stream left outer join
returns outer nulls for already matched rows
Pavel Chernikov created SPARK-26187:
---------------------------------------
Summary: Stream-stream left outer join returns outer nulls for already matched rows
Key: SPARK-26187
URL: https://issues.apache.org/jira/browse/SPARK-26187
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 2.3.2
Reporter: Pavel Chernikov
This is basically the same issue as [SPARK-26154|https://issues.apache.org/jira/browse/SPARK-26154], but with slightly easier reproducible and concrete example:
{code:java}
val rateStream = session.readStream
.format("rate")
.option("rowsPerSecond", 1)
.option("numPartitions", 1)
.load()
import org.apache.spark.sql.functions._
val fooStream = rateStream
.select(col("value").as("fooId"), col("timestamp").as("fooTime"))
val barStream = rateStream
// Introduce misses for ease of debugging
.where(col("value") % 2 === 0)
.select(col("value").as("barId"), col("timestamp").as("barTime")){code}
If barStream is configured to happen earlier than fooStream, based on time range condition, than everything is all right, no previously matched records are flushed with outer NULLs:
{code:java}
val query = fooStream
.withWatermark("fooTime", "5 seconds")
.join(
barStream.withWatermark("barTime", "5 seconds"),
expr("""
barId = fooId AND
fooTime >= barTime AND
fooTime <= barTime + interval 5 seconds
"""),
joinType = "leftOuter"
)
.writeStream
.format("console")
.option("truncate", false)
.start(){code}
It's easy to observe that only odd rows are flushed with NULLs on the right:
h6.
{code:java}
[info] Batch: 1
[info] +-----+-----------------------+-----+-----------------------+
[info] |fooId|fooTime |barId|barTime |
[info] +-----+-----------------------+-----+-----------------------+
[info] |0 |2018-11-27 13:12:34.976|0 |2018-11-27 13:12:34.976|
[info] |6 |2018-11-27 13:12:40.976|6 |2018-11-27 13:12:40.976|
[info] |10 |2018-11-27 13:12:44.976|10 |2018-11-27 13:12:44.976|
[info] |8 |2018-11-27 13:12:42.976|8 |2018-11-27 13:12:42.976|
[info] |2 |2018-11-27 13:12:36.976|2 |2018-11-27 13:12:36.976|
[info] |4 |2018-11-27 13:12:38.976|4 |2018-11-27 13:12:38.976|
[info] +-----+-----------------------+-----+-----------------------+
[info] Batch: 2
[info] +-----+-----------------------+-----+-----------------------+
[info] |fooId|fooTime |barId|barTime |
[info] +-----+-----------------------+-----+-----------------------+
[info] |1 |2018-11-27 13:12:35.976|null |null |
[info] |3 |2018-11-27 13:12:37.976|null |null |
[info] |12 |2018-11-27 13:12:46.976|12 |2018-11-27 13:12:46.976|
[info] |18 |2018-11-27 13:12:52.976|18 |2018-11-27 13:12:52.976|
[info] |14 |2018-11-27 13:12:48.976|14 |2018-11-27 13:12:48.976|
[info] |20 |2018-11-27 13:12:54.976|20 |2018-11-27 13:12:54.976|
[info] |16 |2018-11-27 13:12:50.976|16 |2018-11-27 13:12:50.976|
[info] +-----+-----------------------+-----+-----------------------+
[info] Batch: 3
[info] +-----+-----------------------+-----+-----------------------+
[info] |fooId|fooTime |barId|barTime |
[info] +-----+-----------------------+-----+-----------------------+
[info] |26 |2018-11-27 13:13:00.976|26 |2018-11-27 13:13:00.976|
[info] |22 |2018-11-27 13:12:56.976|22 |2018-11-27 13:12:56.976|
[info] |7 |2018-11-27 13:12:41.976|null |null |
[info] |9 |2018-11-27 13:12:43.976|null |null |
[info] |28 |2018-11-27 13:13:02.976|28 |2018-11-27 13:13:02.976|
[info] |5 |2018-11-27 13:12:39.976|null |null |
[info] |11 |2018-11-27 13:12:45.976|null |null |
[info] |13 |2018-11-27 13:12:47.976|null |null |
[info] |24 |2018-11-27 13:12:58.976|24 |2018-11-27 13:12:58.976|
[info] +-----+-----------------------+-----+-----------------------+
{code}
On the other hand, if we switch the ordering and now fooStream is happening earlier based on time range condition:
{code:java}
val query = fooStream
.withWatermark("fooTime", "5 seconds")
.join(
barStream.withWatermark("barTime", "5 seconds"),
expr("""
barId = fooId AND
barTime >= fooTime AND
barTime <= fooTime + interval 5 seconds
"""),
joinType = "leftOuter"
)
.writeStream
.format("console")
.option("truncate", false)
.start(){code}
Some, not all, previously matched records (with even IDs) are omitted with outer NULLs along with all unmatched records (with odd IDs):
h6.
{code:java}
[info] Batch: 1
[info] +-----+-----------------------+-----+-----------------------+
[info] |fooId|fooTime |barId|barTime |
[info] +-----+-----------------------+-----+-----------------------+
[info] |0 |2018-11-27 13:26:11.463|0 |2018-11-27 13:26:11.463|
[info] |6 |2018-11-27 13:26:17.463|6 |2018-11-27 13:26:17.463|
[info] |10 |2018-11-27 13:26:21.463|10 |2018-11-27 13:26:21.463|
[info] |8 |2018-11-27 13:26:19.463|8 |2018-11-27 13:26:19.463|
[info] |2 |2018-11-27 13:26:13.463|2 |2018-11-27 13:26:13.463|
[info] |4 |2018-11-27 13:26:15.463|4 |2018-11-27 13:26:15.463|
[info] +-----+-----------------------+-----+-----------------------+
[info] Batch: 2
[info] +-----+-----------------------+-----+-----------------------+
[info] |fooId|fooTime |barId|barTime |
[info] +-----+-----------------------+-----+-----------------------+
[info] |12 |2018-11-27 13:26:23.463|12 |2018-11-27 13:26:23.463|
[info] |18 |2018-11-27 13:26:29.463|18 |2018-11-27 13:26:29.463|
[info] |14 |2018-11-27 13:26:25.463|14 |2018-11-27 13:26:25.463|
[info] |20 |2018-11-27 13:26:31.463|20 |2018-11-27 13:26:31.463|
[info] |16 |2018-11-27 13:26:27.463|16 |2018-11-27 13:26:27.463|
[info] +-----+-----------------------+-----+-----------------------+
[info] Batch: 3
[info] +-----+-----------------------+-----+-----------------------+
[info] |fooId|fooTime |barId|barTime |
[info] +-----+-----------------------+-----+-----------------------+
[info] |26 |2018-11-27 13:26:37.463|26 |2018-11-27 13:26:37.463|
[info] |0 |2018-11-27 13:26:11.463|null |null |
[info] |22 |2018-11-27 13:26:33.463|22 |2018-11-27 13:26:33.463|
[info] |7 |2018-11-27 13:26:18.463|null |null |
[info] |9 |2018-11-27 13:26:20.463|null |null |
[info] |28 |2018-11-27 13:26:39.463|28 |2018-11-27 13:26:39.463|
[info] |5 |2018-11-27 13:26:16.463|null |null |
[info] |1 |2018-11-27 13:26:12.463|null |null |
[info] |3 |2018-11-27 13:26:14.463|null |null |
[info] |2 |2018-11-27 13:26:13.463|null |null |
[info] |4 |2018-11-27 13:26:15.463|null |null |
[info] |30 |2018-11-27 13:26:41.463|30 |2018-11-27 13:26:41.463|
[info] |24 |2018-11-27 13:26:35.463|24 |2018-11-27 13:26:35.463|
[info] +-----+-----------------------+-----+-----------------------+
[info] Batch: 4
[info] +-----+-----------------------+-----+-----------------------+
[info] |fooId|fooTime |barId|barTime |
[info] +-----+-----------------------+-----+-----------------------+
[info] |19 |2018-11-27 13:26:30.463|null |null |
[info] |34 |2018-11-27 13:26:45.463|34 |2018-11-27 13:26:45.463|
[info] |32 |2018-11-27 13:26:43.463|32 |2018-11-27 13:26:43.463|
[info] |17 |2018-11-27 13:26:28.463|null |null |
[info] |10 |2018-11-27 13:26:21.463|null |null |
[info] |12 |2018-11-27 13:26:23.463|null |null |
[info] |11 |2018-11-27 13:26:22.463|null |null |
[info] |13 |2018-11-27 13:26:24.463|null |null |
[info] |36 |2018-11-27 13:26:47.463|36 |2018-11-27 13:26:47.463|
[info] |14 |2018-11-27 13:26:25.463|null |null |
[info] |15 |2018-11-27 13:26:26.463|null |null |
[info] |38 |2018-11-27 13:26:49.463|38 |2018-11-27 13:26:49.463|
[info] |40 |2018-11-27 13:26:51.463|40 |2018-11-27 13:26:51.463|
[info] +-----+-----------------------+-----+-----------------------+
{code}
h6.
h6.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org