You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2019/10/08 05:44:16 UTC

[jira] [Resolved] (SPARK-24382) Spark Structured Streaming aggregation on old timestamp data

     [ https://issues.apache.org/jira/browse/SPARK-24382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-24382.
----------------------------------
    Resolution: Incomplete

> Spark Structured Streaming aggregation on old timestamp data
> ------------------------------------------------------------
>
>                 Key: SPARK-24382
>                 URL: https://issues.apache.org/jira/browse/SPARK-24382
>             Project: Spark
>          Issue Type: Question
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Karthik
>            Priority: Major
>              Labels: beginner, bulk-closed
>
> I am trying to aggregate the count of records every 10 seconds using the structured streaming for the following incoming kafka data
> {code:java}
> { 
> "ts2" : "2018/05/01 00:02:50.041", 
> "serviceGroupId" : "123", 
> "userId" : "avv-0", 
> "stream" : "", 
> "lastUserActivity" : "00:02:50", 
> "lastUserActivityCount" : "0" 
> } 
> { 
> "ts2" : "2018/05/01 00:09:02.079", 
> "serviceGroupId" : "123", 
> "userId" : "avv-0", 
> "stream" : "", 
> "lastUserActivity" : "00:09:02", 
> "lastUserActivityCount" : "0" 
> } 
> { 
> "ts2" : "2018/05/01 00:09:02.086", 
> "serviceGroupId" : "123", 
> "userId" : "avv-2", 
> "stream" : "", 
> "lastUserActivity" : "00:09:02", 
> "lastUserActivityCount" : "0" 
> }
> {code}
> With the following logic
> {code:java}
> val sdvTuneInsAgg1 = df 
>  .withWatermark("ts2", "10 seconds") 
>  .groupBy(window(col("ts2"),"10 seconds")) 
>  .agg(count("*") as "count") 
>  .as[CountMetric1]
> val query1 = sdvTuneInsAgg1.writeStream
> .format("console")
> .foreach(writer)
> .start()
> {code}
> and I do not see any records inside the writer. But, the only anomaly is that the current date is 2018/05/24 but the record that I am processing (ts2) has old dates. Will aggregation / count work in this scenario ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org