You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2018/06/28 00:51:00 UTC

[jira] [Assigned] (SPARK-18258) Sinks need access to offset representation

     [ https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-18258:
------------------------------------

    Assignee: Apache Spark

> Sinks need access to offset representation
> ------------------------------------------
>
>                 Key: SPARK-18258
>                 URL: https://issues.apache.org/jira/browse/SPARK-18258
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>            Reporter: Cody Koeninger
>            Assignee: Apache Spark
>            Priority: Major
>
> Transactional "exactly-once" semantics for output require storing an offset identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for the starting and ending offsets (either the offsets themselves, or the SPARK-17829 string/json representation).  That would be an API change, but if there's another way to map batch ids to offset representations without changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a job as e.g. the Kafka dstream gives, because Sinks are the main place that should need them.
> After SPARK-17829 is complete and offsets have a .json method, an api for this ticket might look like
> {code}
> trait Sink {
>   def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: OffsetSeq): Unit
> {code}
> where start and end were provided by StreamExecution.runBatch using committedOffsets and availableOffsets.  
> I'm not 100% certain that the offsets in the seq could always be mapped back to the correct source when restarting complicated multi-source jobs, but I think it'd be sufficient.  Passing the string/json representation of the seq instead of the seq itself would probably be sufficient as well, but the convention of rendering a None as "-" in the json is maybe a little idiosyncratic to parse, and the constant defining that is private.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org