You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Xavi Gervilla <xa...@datapta.com> on 2022/05/09 23:02:51 UTC

Structured streaming help on releasing memory

Hi Team,

I'm developing a streaming project that obtains tweets in real time and after applying some ML models and transformations generate a treemap of the data. The problem I'm facing is that after processing each batch and after the timestamp is completed, memory isn't liberated. 



I've been able to increase the time it takes to collapse the memory up to 9 hours by changing the Memory fraction to 0.7 (default 0.5) but regardless of how many tweets arrive, it starts at 2GB, increases to 4GB-6GB with the first batches and slowly oscillates from  6.98GB to 7.24GB until it collapses after several hours (on a EC2 with 7.7GB of RAM).



The aggregated number of updated state rows evolves as expected with the input rows, but the aggregated state memory used in bytes is only released three times during an execution of nine and it gets higher than it was on the following batches.





As a reference, this is the code I use to add the timestamp column and after some transformations apply the window and watermark to avoid processing late tweets.



timeStampCol = tweets.withColumn('timestamp', current_timestamp())



//apply some transformations to timeStampCol, resulting in transformationsDF



//watermark of 10 minutes and tumbling window of 5 minutes. entLab == entities (obtained with SparkNLP NER pretrained pipeline), sentNum == sentiment of the tweet (obtained with SparkNLP sentiment pretrained pipeline)


finalDF = transformationsDF.withWatermark("timestamp", "10 minutes").\
      groupBy(window("timestamp", "5 minutes"), "entLab").\
      agg(avg("sentNum").alias("avgSent"), count("sentNum").alias("countEnt"))

query = finalDF.writeStream.queryName('treemapResult').\
      foreachBatch(processBatch).outputMode("update").\
      option("checkpointLocation", "/tmp/checkpoints").start()

//the processBatch function writes the dataframe to a .csv file



I've also tried using append mode but the memory consumption is very similar.



Is there something wrong with the declaration of the window/watermark? What could be causing the data to keep accumulating even after the 10 minute watermark and after the batch is processed?



If there's any additional information you might need or think might be helpful to understand better the problem I'll be happy to provide it.



You all have been able to help in the past so thank you in advance.