You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Vaclav Kosar (JIRA)" <ji...@apache.org> on 2018/07/26 10:38:00 UTC

[jira] [Updated] (SPARK-24647) Sink Should Return Writen Offsets For ProgressReporting

     [ https://issues.apache.org/jira/browse/SPARK-24647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Vaclav Kosar updated SPARK-24647:
---------------------------------
    Summary: Sink Should Return Writen Offsets For ProgressReporting  (was: Sink Should Return OffsetSeqs For ProgressReporting)

> Sink Should Return Writen Offsets For ProgressReporting
> -------------------------------------------------------
>
>                 Key: SPARK-24647
>                 URL: https://issues.apache.org/jira/browse/SPARK-24647
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: Vaclav Kosar
>            Priority: Major
>
> To be able to track data lineage for Structured Streaming (I intend to implement this to Open Source Project Spline), the monitoring needs to be able to not only to track where the data was read from but also where results were written to. This could be to my knowledge best implemented using monitoring {{StreamingQueryProgress}}. However currently batch data offsets are not available on {{Sink}} interface. Implementing as proposed would also bring symmetry to {{StreamingQueryProgress}} fields sources and sink.
>  
> *Similar Proposals*
> Made in following jiras. These would not be sufficient for lineage tracking.
>  * https://issues.apache.org/jira/browse/SPARK-18258
>  * https://issues.apache.org/jira/browse/SPARK-21313
>  
> *Current State*
>  * Method {{Sink#addBatch}} returns {{Unit}}.
>  * {{StreamingQueryProgress}} reports {{offsetSeq}} start and end using {{sourceProgress}} value but {{sinkProgress}} only calls {{toString}} method.
> {code:java}
>   "sources" : [ {
>     "description" : "KafkaSource[Subscribe[test-topic]]",
>     "startOffset" : null,
>     "endOffset" : { "test-topic" : { "0" : 5000 }},
>     "numInputRows" : 5000,
>     "processedRowsPerSecond" : 645.3278265358803
>   } ],
>   "sink" : {
>     "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@9da556f"
>   }
> {code}
>  
>  
> *Proposed State*
>  * {{Sink#addBatch}} to return {{OffsetSeq}} or {{StreamProgress}} specifying offsets of the written batch, e.g. Kafka does it by returning {{RecordMetadata}} object from {{send}} method.
>  * {{StreamingQueryProgress}} incorporate {{sinkProgress}} in similar fashion as {{sourceProgress}}.
>  
>  
> {code:java}
>   "sources" : [ {
>     "description" : "KafkaSource[Subscribe[test-topic]]",
>     "startOffset" : null,
>     "endOffset" : { "test-topic" : { "0" : 5000 }},
>     "numInputRows" : 5000,
>     "processedRowsPerSecond" : 645.3278265358803
>   } ],
>   "sink" : {
>     "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@9da556f",
>    "startOffset" : null,
>     "endOffset" { "sinkTopic": { "0": 333 }}
>   }
> {code}
>  
> *Implementation*
> * PR submitters: Likely will be me and [~wajda] as soon as the discussion ends positively. 
>  * {{Sinks}}: Modify all sinks to conform a new interface or return dummy values.
>  * {{ProgressReporter}}: Merge offsets from different batches properly, similarly to how it is done for sources.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org