You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "jingshanglu (Jira)" <ji...@apache.org> on 2019/10/10 10:12:00 UTC
[jira] [Created] (SPARK-29426) Watermark does not take effect
jingshanglu created SPARK-29426:
-----------------------------------
Summary: Watermark does not take effect
Key: SPARK-29426
URL: https://issues.apache.org/jira/browse/SPARK-29426
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 2.4.3
Environment: my kafka mes like this:
{code:java}
// code placeholder
[kafka@HC-25-28-36 ~]$ kafka-console-producer.sh --broker-list 172.25.28.38:9092,172.25.28.37:9092,172.25.28.36:9092 --topic test0
{"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-04 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{code}
output like this:
{code:java}
// code placeholder
Batch: 5
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
| window| sql| client| ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1| 2|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1| 2|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 6
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
| window| sql| client| ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1| 3|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1| 3|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 7
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
| window| sql| client| ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-04 12:20...|select * from user|192.168.54.6|172.0.0.1| 1|
|[2019-03-04 12:15...|select * from user|192.168.54.6|172.0.0.1| 1|
+--------------------+------------------+------------+---------+-----+
{code}
the watermark behind the event time(2019-03-04 12:23:22), but this event
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
still be Aggregated
Reporter: jingshanglu
I use withWatermark and window to express windowed aggregations, but the Watermark does not take effect.
my code:
{code:java}
// code placeholder
Dataset<Row> clientSqlIpCount = mes.withWatermark("timestamp","1 minute")
.groupBy(
functions.window(mes.col("timestamp"),"10 minutes","5 minutes"),
mes.col("sql"),mes.col("client"),mes.col("ip"))
.count();
StreamingQuery query = clientSqlIpCount
.writeStream()
.outputMode("Update")
.format("console")
.start();
spark.streams().awaitAnyTermination();
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org