You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2019/10/08 05:44:16 UTC
[jira] [Resolved] (SPARK-24382) Spark Structured Streaming
aggregation on old timestamp data
[ https://issues.apache.org/jira/browse/SPARK-24382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-24382.
----------------------------------
Resolution: Incomplete
> Spark Structured Streaming aggregation on old timestamp data
> ------------------------------------------------------------
>
> Key: SPARK-24382
> URL: https://issues.apache.org/jira/browse/SPARK-24382
> Project: Spark
> Issue Type: Question
> Components: Structured Streaming
> Affects Versions: 2.2.0
> Reporter: Karthik
> Priority: Major
> Labels: beginner, bulk-closed
>
> I am trying to aggregate the count of records every 10 seconds using the structured streaming for the following incoming kafka data
> {code:java}
> {
> "ts2" : "2018/05/01 00:02:50.041",
> "serviceGroupId" : "123",
> "userId" : "avv-0",
> "stream" : "",
> "lastUserActivity" : "00:02:50",
> "lastUserActivityCount" : "0"
> }
> {
> "ts2" : "2018/05/01 00:09:02.079",
> "serviceGroupId" : "123",
> "userId" : "avv-0",
> "stream" : "",
> "lastUserActivity" : "00:09:02",
> "lastUserActivityCount" : "0"
> }
> {
> "ts2" : "2018/05/01 00:09:02.086",
> "serviceGroupId" : "123",
> "userId" : "avv-2",
> "stream" : "",
> "lastUserActivity" : "00:09:02",
> "lastUserActivityCount" : "0"
> }
> {code}
> With the following logic
> {code:java}
> val sdvTuneInsAgg1 = df
> .withWatermark("ts2", "10 seconds")
> .groupBy(window(col("ts2"),"10 seconds"))
> .agg(count("*") as "count")
> .as[CountMetric1]
> val query1 = sdvTuneInsAgg1.writeStream
> .format("console")
> .foreach(writer)
> .start()
> {code}
> and I do not see any records inside the writer. But, the only anomaly is that the current date is 2018/05/24 but the record that I am processing (ts2) has old dates. Will aggregation / count work in this scenario ?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org