You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Xavier Gervilla <xa...@datapta.com> on 2022/04/27 14:44:51 UTC

[window aggregate][debug] Rows not dropping with watermark and window

Hi team,



With your help last week I was able to adapt a project I'm developing and apply a sentiment analysis and NER retrieval to streaming tweets. One of the next steps in order to ensure that memory doesn't collapse is applying windows and watermarks to discard tweets after some time. However, when checking the metric ''Aggregated Number Of Rows Dropped By Watermark'' from SparkUI it's always 0.



This is the updated code I use to apply the Sentiment and NER prediction and to add the timestamp value:



sentPipeline = PretrainedPipeline('analyze_sentiment')

nerPipeline = PretrainedPipeline('recognize_entities_dl')



sentPred = sentPipeline.transform(tweets)

nerPred = nerPipeline.transform(sentPred)

tsCol = nerPred.withColumn('timestamp', current_timestamp())



After applying some transformations I generate two columns with the entity (entLab) and its sentiment (sentNum) and apply the watermark before doing the query:



finalDF = resultDF.withWatermark("timestamp", "10 minutes").\

      groupBy("entLab", window("timestamp", "5 minutes", "2 minutes")).\

      agg(avg("sentNum").alias("avgSent"), count("sentNum").alias("countEnt")).\

      select("entLab", "avgSent", "countEnt")



query = finalDF.writeStream.queryName('treemapResult').\

      foreachBatch(processBatch).outputMode("update").\

      option("checkpointLocation", "/tmp/checkpoints").start()



Each processBatch generates a plot with the selected values.



When I execute the program it's mostly maintained at 7GB of RAM but increases really slowly and as mentioned above when checking SparkUI the number of rows dropped is zero. I've tried changing the output to append (since using complete would be the opposite of the goal) but the result is very similar.



Is there any problem with the declaration of the watermark? And how could I change it to generate a plot after every window is finished? Right now it generates around 80-90 seconds instead of the two minutes there are between sliding windows.



Thank you in advance!