You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Armbrust (JIRA)" <ji...@apache.org> on 2016/11/03 20:01:59 UTC

[jira] [Commented] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

    [ https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15634061#comment-15634061 ] 

Michael Armbrust commented on SPARK-17937:
------------------------------------------

I'm going to pull this out from the parent JIRA as I don't think it blocks basic kafka usage, and there are enough things here to warrant several subtasks on their own.  I see a couple of possible concrete action items here:
 - make sure what we have is documented clearly, possibly even with a comparison to {{auto.offset.reset}} for those more familiar with kafka
 - what to do on data loss: seems we are missing the ability to do {{latest}}.  I'd be fine with changing this to {{onDataLoss = fail,earliest,latest}} with fail being the default.  It would be nice to keep compatibility with the old option, but that is minor.
 - how to handle new partitions: if possible I'd like to lump this into the {{onDataLoss}} setting.  when a new partition appears midquery the default should be to process all of it (if that can be assured).  If it can't because of downtime and data has already aged out, I'd like to error by default, but the user should be able to pick earliest or latest.
 - timestamps: sounds awesome, should probably be its own feature JIRA
 - integration with the kafka commit log: also could probably be its own feature JIRA.  I'd also like to hear requests from users on what they need here. Is it monitoring?  Is it moving queries to structured streaming.  My big concern it might be confusing since we can't use the same transactional tricks we use for our own checkpoint commit log and I don't want users to loose exactly-once without understanding why.
 - X offsets - also its own feature JIRA.  I agree it only makes sense for topics that are uniformly hash partitioned (all of mine are).  Maybe we skip this if we get timestamps soon enough.

> Clarify Kafka offset semantics for Structured Streaming
> -------------------------------------------------------
>
>                 Key: SPARK-17937
>                 URL: https://issues.apache.org/jira/browse/SPARK-17937
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Structured Streaming
>            Reporter: Cody Koeninger
>
> Possible events for which offsets are needed:
> # New partition is discovered
> # Offset out of range (aka, data has been lost).   It's possible to separate this into offset too small and offset too large, but I'm not sure it matters for us.
> Possible sources of offsets:
> # *Earliest* position in log
> # *Latest* position in log
> # *Fail* and kill the query
> # *Checkpoint* position
> # *User specified* per topicpartition
> # *Kafka commit log*.  Currently unsupported.  This means users who want to migrate from existing kafka jobs need to jump through hoops.  Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled.
> # *Timestamp*.  Currently unsupported.  This could be supported with old, inaccurate Kafka time api, or upcoming time index
> # *X offsets* before or after latest / earliest position.  Currently unsupported.  I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets.
> Currently allowed pre-query configuration, all "ORs" are exclusive:
> # startingOffsets: *earliest* OR *latest* OR *User specified* json per topicpartition  (SPARK-17812)
> # failOnDataLoss: true (which implies *Fail* above) OR false (which implies *Earliest* above)  In general, I see no reason this couldn't specify Latest as an option.
> Possible lifecycle times in which an offset-related event may happen:
> # At initial query start
> #* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*.  Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL
> #* Offset out of range on driver: We don't technically have behavior for this case yet.  Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with earliest for a during-query out of range
> #* Offset out of range on executor: seems like it should be *Fail* or *Earliest*, based on failOnDataLoss.  but it looks like this setting is currently ignored, and the executor will just fail...
> # During query
> #* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason this can't be configurable.
> #* Offset out of range on driver:  this _probably_ doesn't happen, because we're doing explicit seeks to the latest position
> #* Offset out of range on executor:  ?
> # At query restart 
> #* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this couldn't be configurable fall back to Latest
> #* Offset out of range on driver:   this _probably_ doesn't happen, because we're doing explicit seeks to the specified position
> #* Offset out of range on executor:  ?
> I've probably missed something, chime in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org