You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2020/01/26 01:30:00 UTC

[jira] [Closed] (SPARK-30634) Delta Merge and Arbitrary Stateful Processing in Structured streaming (foreachBatch)

     [ https://issues.apache.org/jira/browse/SPARK-30634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongjoon Hyun closed SPARK-30634.
---------------------------------

> Delta Merge and Arbitrary Stateful Processing in Structured streaming  (foreachBatch)
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-30634
>                 URL: https://issues.apache.org/jira/browse/SPARK-30634
>             Project: Spark
>          Issue Type: Question
>          Components: Examples, Spark Core, Structured Streaming
>    Affects Versions: 2.4.3
>         Environment: Spark 2.4.3 (scala 2.11.12)
> Delta: 0.5.0
> Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
> OS: Ubuntu 18.04 LTS
>  
>            Reporter: Yurii Oleynikov
>            Priority: Trivial
>         Attachments: Capture1.PNG
>
>
> Hi ,
> I have an application that makes Arbitrary Stateful Processing in Structured Streaming and used delta.merge to update delta table and faced strange behaviour:
> 1. I've noticed that logs inside implementation of {{MapGroupsWithStateFunction}}/ {{FlatMapGroupsWithStateFunction}} in my application outputted twice.
> 2. While finding a root cause I've also found that number State rows reported by Spark is also doubles.
>  
> I thought that may be there's a bug in my code, so I back to {{JavaStructuredSessionization}} from Apache Spark examples and changed it a bit. Still got same result.
> The problem happens only if I do not perform datch.DF.persist inside foreachBatch.
> {code:java}
> StreamingQuery query = sessionUpdates
>         .writeStream()
>         .outputMode("update")
>         .foreachBatch((VoidFunction2<Dataset<SessionUpdate>, Long>) (batchDf, v2) -> {
>             // following doubles number of spark state rows and causes MapGroupsWithStateFunction to log twice withport persisting
>             deltaTable.as("sessions").merge(batchDf.toDF().as("updates"), mergeExpr)
>                     .whenNotMatched().insertAll()
>                     .whenMatched()
>                     .updateAll()
>                     .execute();
>         })
>         .trigger(Trigger.ProcessingTime(10000))
>         .queryName("ACME")
>         .start(); 
> {code}
> According to [https://docs.databricks.com/_static/notebooks/merge-in-streaming.html] and [Apache spark docs|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch] there's seems to be no need to persist dataset/dataframe inside {{foreachBatch.}}
> Sample code from Apache Spark examples with delta: [JavaStructuredSessionization with Delta merge|https://github.com/yurkao/delta-merge-sss/blob/master/src/main/java/JavaStructuredSessionization.java]
>  
>  
> Appreciate your clarification.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org