You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Armbrust (JIRA)" <ji...@apache.org> on 2016/11/04 19:57:58 UTC

[jira] [Commented] (SPARK-18258) Sinks need access to offset representation

    [ https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15637491#comment-15637491 ] 

Michael Armbrust commented on SPARK-18258:
------------------------------------------

I agree that we don't want to lock people in, which is why a goal of [SPARK-17829] was to make the offset representation user readable.

Given that they are accesible in this way (we should probably document this, and make it a long term contract), I don't think that we want to widen the Sink API, to expose the internal details of the various sources. Exposing more than we need to leaks details and could lead to a more brittle system. This is why I think its safer to use {{batchId}} as a proxy to achieve transactional semantics. 

Consider the case where some source returns the offsets: {{a: 1, b: 2}} but upon recovery it returns {{b: 2, a: 1}}.  This is a little weird, but as long as they implement {{getBatch}} correctly, there are no correctness issues in this Source.  However, with this proposal, the sink is now responsible for reasoning about equality of these representations. In contrast, its trivial to reason about equality in the current API.

> Sinks need access to offset representation
> ------------------------------------------
>
>                 Key: SPARK-18258
>                 URL: https://issues.apache.org/jira/browse/SPARK-18258
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>            Reporter: Cody Koeninger
>
> Transactional "exactly-once" semantics for output require storing an offset identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for the starting and ending offsets (either the offsets themselves, or the SPARK-17829 string/json representation).  That would be an API change, but if there's another way to map batch ids to offset representations without changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a job as e.g. the Kafka dstream gives, because Sinks are the main place that should need them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org