You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sebastian Herold (JIRA)" <ji...@apache.org> on 2019/03/07 13:39:00 UTC

[jira] [Updated] (SPARK-27086) DataSourceV2 MicroBatchExecution commits last batch only if new batch is constructed

     [ https://issues.apache.org/jira/browse/SPARK-27086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sebastian Herold updated SPARK-27086:
-------------------------------------
    Description: 
I wanted to use the new {{DataSourceV2}} API to build a AWS SQS streaming data source which offers the new {{commit}} method of the {{MicroBatchReader}} to finally commit the message at SQS after it has been processed. If the processing of messages would fail and they got not committed, after a timeout the message would automatically reappear in SQS which is the intended behaviour without using special state storing or checkpointing.
Sadly, I noticed that an offset in the {{MicroBatchReader}} got only committed if a new batch is constructed ([see line 400 in {{MicroBatchExecution}}|https://github.com/apache/spark/blob/f7ad4ff040d39c7a55a9e01a990534e55c8178a5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L400]) which is quite strange. Especially, in my SQS example it could happen that after a first batch of messages this there is a long break before the new messages are send to SQS. This would lead to a timeout and reappearance of the SQS messages from the previous batch, because they got processed, but not committed. Therefore, I would strongly recommend to commit an offset, once it is processed! The committing should be independent from the next batch!

  was:
I wanted to use the new {{DataSourceV2}} API to build a AWS SQS streaming data source which uses the new {{commit}} method of the {{MicroBatchReader}} to finally commit the message at SQS after it has been processed. If the processing of messages would fail and they got not committed, after a timeout the message would automatically reappear in SQS which is the intended behaviour without using special state storing or checkpointing.
Sadly, I noticed that an offset in the {{MicroBatchReader}} got only committed if a new batch is constructed ([see line 400 in {{MicroBatchExecution}}|https://github.com/apache/spark/blob/f7ad4ff040d39c7a55a9e01a990534e55c8178a5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L400]) which is quite strange. Especially, in my SQS example it could happen that after a first batch of messages this there is a long break before the new messages are send to SQS. This would lead to a timeout and reappearance of the SQS messages from the previous batch, because they got processed, but not committed. Therefore, I would strongly recommend to commit an offset, once it is processed! The committing should be independent from the next batch!


> DataSourceV2 MicroBatchExecution commits last batch only if new batch is constructed
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-27086
>                 URL: https://issues.apache.org/jira/browse/SPARK-27086
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Sebastian Herold
>            Priority: Major
>              Labels: MicroBatchExecution, MicroBatchReader, Spark, Streaming, Structured, commit
>
> I wanted to use the new {{DataSourceV2}} API to build a AWS SQS streaming data source which offers the new {{commit}} method of the {{MicroBatchReader}} to finally commit the message at SQS after it has been processed. If the processing of messages would fail and they got not committed, after a timeout the message would automatically reappear in SQS which is the intended behaviour without using special state storing or checkpointing.
> Sadly, I noticed that an offset in the {{MicroBatchReader}} got only committed if a new batch is constructed ([see line 400 in {{MicroBatchExecution}}|https://github.com/apache/spark/blob/f7ad4ff040d39c7a55a9e01a990534e55c8178a5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L400]) which is quite strange. Especially, in my SQS example it could happen that after a first batch of messages this there is a long break before the new messages are send to SQS. This would lead to a timeout and reappearance of the SQS messages from the previous batch, because they got processed, but not committed. Therefore, I would strongly recommend to commit an offset, once it is processed! The committing should be independent from the next batch!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org