You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2018/08/19 18:05:00 UTC

[jira] [Assigned] (SPARK-24647) Sink Should Return Writen Offsets For ProgressReporting

     [ https://issues.apache.org/jira/browse/SPARK-24647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-24647:
------------------------------------

    Assignee: Apache Spark

> Sink Should Return Writen Offsets For ProgressReporting
> -------------------------------------------------------
>
>                 Key: SPARK-24647
>                 URL: https://issues.apache.org/jira/browse/SPARK-24647
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: Vaclav Kosar
>            Assignee: Apache Spark
>            Priority: Major
>
> To be able to track data lineage for Structured Streaming (I intend to implement this to Open Source Project Spline), the monitoring needs to be able to not only to track where the data was read from but also where results were written to. This could be to my knowledge best implemented using monitoring {{StreamingQueryProgress}}. However currently written data offsets are not available on {{Sink}} or {{StreamWriter}} interface. Implementing as proposed would also bring symmetry to {{StreamingQueryProgress}} fields sources and sink.
>  
> *Similar Proposals*
> Made in following jiras. These would not be sufficient for lineage tracking.
>  * https://issues.apache.org/jira/browse/SPARK-18258
>  * https://issues.apache.org/jira/browse/SPARK-21313
>  
> *Current State*
>  * Method {{Sink#addBatch}} returns {{Unit}}.
>  * Object {{WriterCommitMessage}} does not carry any progress information about comitted rows.
>  * {{StreamingQueryProgress}} reports {{offsetSeq}} start and end using {{sourceProgress}} value but {{sinkProgress}} only calls {{toString}} method.
> {code:java}
>   "sources" : [ {
>     "description" : "KafkaSource[Subscribe[test-topic]]",
>     "startOffset" : null,
>     "endOffset" : { "test-topic" : { "0" : 5000 }},
>     "numInputRows" : 5000,
>     "processedRowsPerSecond" : 645.3278265358803
>   } ],
>   "sink" : {
>     "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@9da556f"
>   }
> {code}
>  
>  
> *Proposed State*
>  * Implement support only for v2 sinks as those are to use used in future.
>  * {{WriterCommitMessage}} to hold optional min and max offset information of commited rows e.g. Kafka does it by returning {{RecordMetadata}} object from {{send}} method.
>  * {{StreamingQueryProgress}} incorporate {{sinkProgress}} in similar fashion as {{sourceProgress}}.
>  
>  
> {code:java}
>   "sources" : [ {
>     "description" : "KafkaSource[Subscribe[test-topic]]",
>     "startOffset" : null,
>     "endOffset" : { "test-topic" : { "0" : 5000 }},
>     "numInputRows" : 5000,
>     "processedRowsPerSecond" : 645.3278265358803
>   } ],
>   "sink" : {
>     "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@9da556f",
>    "startOffset" : null,
>     "endOffset" { "sinkTopic": { "0": 333 }}
>   }
> {code}
>  
> *Implementation*
> * PR submitters: Me and [~wajda] as soon as prerequisite jira is merged.
>  * {{Sinks}}: Modify all v2 sinks to conform a new interface or return dummy values.
>  * {{ProgressReporter}}: Merge offsets from different batches properly, similarly to how it is done for sources.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org