You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Richard Yu (JIRA)" <ji...@apache.org> on 2018/06/27 11:13:00 UTC

[jira] [Comment Edited] (SPARK-18258) Sinks need access to offset representation

    [ https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524921#comment-16524921 ] 

Richard Yu edited comment on SPARK-18258 at 6/27/18 11:12 AM:
--------------------------------------------------------------

Just a question, I noticed that in {{KafkaSink}}'s particular implementation of {{addBatch}}, there is a preexisting schema which needs to be followed. More specifically, a {{ProducerRecord}} (provided by Kafka) will be sent to the producer with room only for the topic name, a key, and a value. However, there does not appear to be anyway that exists where we can also export the data involving the {{start}} and {{end}} {{OffsetSeq}}s to Kafka as well. So I am right to assume that the new data included is to be used for checkpointing purposes only?


was (Author: yohan123):
Just a question, I noticed that in {{KafkaSink}}'s particular implementation of {{addBatch}}, there is a preexisting schema which needs to be followed. More specifically, a {{ProducerRecord}} (provided by Kafka) will be sent to the producer with room only for the topic name, a key, and a value. However, there does not appear to be anyway that exists where we can also export the data involving the {{start}} and {{end}} {{OffsetSeq}}s to Kafka as well. So I am right to assume that the new data included is to be used for checkpointing purposes only?

> Sinks need access to offset representation
> ------------------------------------------
>
>                 Key: SPARK-18258
>                 URL: https://issues.apache.org/jira/browse/SPARK-18258
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>            Reporter: Cody Koeninger
>            Priority: Major
>
> Transactional "exactly-once" semantics for output require storing an offset identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for the starting and ending offsets (either the offsets themselves, or the SPARK-17829 string/json representation).  That would be an API change, but if there's another way to map batch ids to offset representations without changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a job as e.g. the Kafka dstream gives, because Sinks are the main place that should need them.
> After SPARK-17829 is complete and offsets have a .json method, an api for this ticket might look like
> {code}
> trait Sink {
>   def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: OffsetSeq): Unit
> {code}
> where start and end were provided by StreamExecution.runBatch using committedOffsets and availableOffsets.  
> I'm not 100% certain that the offsets in the seq could always be mapped back to the correct source when restarting complicated multi-source jobs, but I think it'd be sufficient.  Passing the string/json representation of the seq instead of the seq itself would probably be sufficient as well, but the convention of rendering a None as "-" in the json is maybe a little idiosyncratic to parse, and the constant defining that is private.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org