You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "jingshanglu (Jira)" <ji...@apache.org> on 2019/10/10 10:14:00 UTC

[jira] [Updated] (SPARK-29426) Watermark does not take effect

     [ https://issues.apache.org/jira/browse/SPARK-29426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

jingshanglu updated SPARK-29426:
--------------------------------
    Environment:     (was: my kafka mes like this:
{code:java}
// code placeholder

[kafka@HC-25-28-36 ~]$ kafka-console-producer.sh --broker-list 172.25.28.38:9092,172.25.28.37:9092,172.25.28.36:9092 --topic test0
{"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-04 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{code}
output like this:
{code:java}
// code placeholder
Batch: 5
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1|    2|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1|    2|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 6
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1|    3|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1|    3|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 7
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-04 12:20...|select * from user|192.168.54.6|172.0.0.1|    1|
|[2019-03-04 12:15...|select * from user|192.168.54.6|172.0.0.1|    1|
+--------------------+------------------+------------+---------+-----+
{code}
the watermark behind the event time(2019-03-04 12:23:22), but this event

{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}

still be Aggregated)

my kafka mes like this:
{code:java}
// code placeholder

[kafka@HC-25-28-36 ~]$ kafka-console-producer.sh --broker-list 172.25.28.38:9092,172.25.28.37:9092,172.25.28.36:9092 --topic test0
{"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-04 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{code}
output like this:
{code:java}
// code placeholder
Batch: 5
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1|    2|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1|    2|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 6
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1|    3|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1|    3|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 7
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-04 12:20...|select * from user|192.168.54.6|172.0.0.1|    1|
|[2019-03-04 12:15...|select * from user|192.168.54.6|172.0.0.1|    1|
+--------------------+------------------+------------+---------+-----+
{code}
the watermark behind the event time(2019-03-04 12:23:22), but this event

{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}

still be Aggregated

> Watermark does not take effect
> ------------------------------
>
>                 Key: SPARK-29426
>                 URL: https://issues.apache.org/jira/browse/SPARK-29426
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.3
>            Reporter: jingshanglu
>            Priority: Major
>
> I use withWatermark and window to express windowed aggregations, but the Watermark does not take effect.
> my code:
> {code:java}
> // code placeholder
> Dataset<Row> clientSqlIpCount = mes.withWatermark("timestamp","1 minute")
>         .groupBy(
>                 functions.window(mes.col("timestamp"),"10 minutes","5 minutes"),
>                 mes.col("sql"),mes.col("client"),mes.col("ip"))
>         .count();
> StreamingQuery query = clientSqlIpCount
>                 .writeStream()
>                 .outputMode("Update")
>                 .format("console")
>                 .start();
> spark.streams().awaitAnyTermination();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org