You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2014/03/04 22:23:42 UTC

[jira] [Updated] (SAMZA-157) Offset default behavior for streams

     [ https://issues.apache.org/jira/browse/SAMZA-157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Riccomini updated SAMZA-157:
----------------------------------

    Attachment: SAMZA-157.0.patch

Attaching a patch. RB at:

https://reviews.apache.org/r/18711/

Changes:

1. Added currentOffset and nextOffset to IncomingMessageEnvelope.
2. Added an OffsetType enum (oldest, newest, upcoming) to SystemStreamMetadata, and use it everywhere.
3. Updated a bunch of tests to reflect the new API.
4. Added an OffsetManager, which loads initial offsets, keeps track of offsets over time, and checkpoints them using the CheckpointManager. TaskInstances use this object to manage their offsets.
5. Added systems.%s.streams.%s.samza.offset.default and systems.%s.samza.offset.default to config. Value can be "oldest" or "newest". Defaults to "newest" if not defined.
6. Updated SamzaContainer to load the default offset types, and wire in OffsetManager.
7. Updated TaskInstance to use the OffsetManager.
8. Cleaned up BootstrappingChooser to always used "upcoming", since we have the upcoming offset available in the IncomingMessageEnvelope now.
9. Added TestOffsetManager.

For (1), I initially used an alternative approach. I changed SystemAdmin to have a getNextOffset(Map[SystemStreamPartition, String] offsets) API. That approach seemed hacky, so I switched to the currentOffset/nextOffset API in IncomingMessageEnvelope. The main trade-off I see is that consumers now MUST know the upcoming offset for a given message at the time that the message is read. This is fine for Kafka, and should be fine for incremental offsets (currentOffset+1). I couldn't come up with a case where it would be a problem, unless there is a system that uses GUIDs that are generated at the time of write.

> Offset default behavior for streams
> -----------------------------------
>
>                 Key: SAMZA-157
>                 URL: https://issues.apache.org/jira/browse/SAMZA-157
>             Project: Samza
>          Issue Type: Sub-task
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>         Attachments: SAMZA-157.0.patch
>
>
> Introduce a systems.<system name>.streams.<stream name>.samza.offset.default configuration, which specifies what to do when no checkpoint exists for an input topic. This is a similar to setting to Kafka's auto.offset.reset setting. Developers will be able to specify "oldest", "latest", or "fail".
> we should also add the ability to override offsets for specific stream partitions. Something like:
> {noformat}
> systems.<system name>.streams.<stream name>.samza.force.offsets=0:123,1:123,2:123
> {noformat}
> The format I'm proposing is:
> {noformat}
> <partition string>:<force offset>,...
> {noformat}
> This is obviously dependent on offsets not having ':' or ',' in them, which I think is a safe assumption.
> This setting would force the system consumer to be registered with the specified offset for the given SSP (ignoring both the checkpoint, if it exists, and the samza.reset.offset setting).



--
This message was sent by Atlassian JIRA
(v6.2#6252)