You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "jingshanglu (Jira)" <ji...@apache.org> on 2019/10/10 10:12:00 UTC

[jira] [Created] (SPARK-29426) Watermark does not take effect

jingshanglu created SPARK-29426:
-----------------------------------

             Summary: Watermark does not take effect
                 Key: SPARK-29426
                 URL: https://issues.apache.org/jira/browse/SPARK-29426
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.4.3
         Environment: my kafka mes like this:
{code:java}
// code placeholder

[kafka@HC-25-28-36 ~]$ kafka-console-producer.sh --broker-list 172.25.28.38:9092,172.25.28.37:9092,172.25.28.36:9092 --topic test0
{"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-04 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{code}
output like this:
{code:java}
// code placeholder
Batch: 5
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1|    2|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1|    2|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 6
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1|    3|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1|    3|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 7
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-04 12:20...|select * from user|192.168.54.6|172.0.0.1|    1|
|[2019-03-04 12:15...|select * from user|192.168.54.6|172.0.0.1|    1|
+--------------------+------------------+------------+---------+-----+
{code}
the watermark behind the event time(2019-03-04 12:23:22), but this event

{"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}

still be Aggregated
            Reporter: jingshanglu


I use withWatermark and window to express windowed aggregations, but the Watermark does not take effect.

my code:
{code:java}

// code placeholder
Dataset<Row> clientSqlIpCount = mes.withWatermark("timestamp","1 minute")
        .groupBy(
                functions.window(mes.col("timestamp"),"10 minutes","5 minutes"),
                mes.col("sql"),mes.col("client"),mes.col("ip"))
        .count();
StreamingQuery query = clientSqlIpCount
                .writeStream()
                .outputMode("Update")
                .format("console")
                .start();
spark.streams().awaitAnyTermination();
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org