You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "jingshanglu (Jira)" <ji...@apache.org> on 2019/10/10 10:14:00 UTC
[jira] [Updated] (SPARK-29426) Watermark does not take effect
[ https://issues.apache.org/jira/browse/SPARK-29426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
jingshanglu updated SPARK-29426:
--------------------------------
Environment: (was: my kafka mes like this:
{code:java}
// code placeholder
[kafka@HC-25-28-36 ~]$ kafka-console-producer.sh --broker-list 172.25.28.38:9092,172.25.28.37:9092,172.25.28.36:9092 --topic test0
{"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-04 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{code}
output like this:
{code:java}
// code placeholder
Batch: 5
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
| window| sql| client| ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1| 2|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1| 2|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 6
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
| window| sql| client| ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1| 3|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1| 3|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 7
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
| window| sql| client| ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-04 12:20...|select * from user|192.168.54.6|172.0.0.1| 1|
|[2019-03-04 12:15...|select * from user|192.168.54.6|172.0.0.1| 1|
+--------------------+------------------+------------+---------+-----+
{code}
the watermark behind the event time(2019-03-04 12:23:22), but this event
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
still be Aggregated)
my kafka mes like this:
{code:java}
// code placeholder
[kafka@HC-25-28-36 ~]$ kafka-console-producer.sh --broker-list 172.25.28.38:9092,172.25.28.37:9092,172.25.28.36:9092 --topic test0
{"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-04 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{code}
output like this:
{code:java}
// code placeholder
Batch: 5
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
| window| sql| client| ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1| 2|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1| 2|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 6
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
| window| sql| client| ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1| 3|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1| 3|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 7
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
| window| sql| client| ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-04 12:20...|select * from user|192.168.54.6|172.0.0.1| 1|
|[2019-03-04 12:15...|select * from user|192.168.54.6|172.0.0.1| 1|
+--------------------+------------------+------------+---------+-----+
{code}
the watermark behind the event time(2019-03-04 12:23:22), but this event
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
still be Aggregated
> Watermark does not take effect
> ------------------------------
>
> Key: SPARK-29426
> URL: https://issues.apache.org/jira/browse/SPARK-29426
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.4.3
> Reporter: jingshanglu
> Priority: Major
>
> I use withWatermark and window to express windowed aggregations, but the Watermark does not take effect.
> my code:
> {code:java}
> // code placeholder
> Dataset<Row> clientSqlIpCount = mes.withWatermark("timestamp","1 minute")
> .groupBy(
> functions.window(mes.col("timestamp"),"10 minutes","5 minutes"),
> mes.col("sql"),mes.col("client"),mes.col("ip"))
> .count();
> StreamingQuery query = clientSqlIpCount
> .writeStream()
> .outputMode("Update")
> .format("console")
> .start();
> spark.streams().awaitAnyTermination();
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org