You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2015/03/17 00:25:39 UTC

[jira] [Commented] (SAMZA-603) Expose more consumer controls to developers

    [ https://issues.apache.org/jira/browse/SAMZA-603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14364196#comment-14364196 ] 

Chris Riccomini commented on SAMZA-603:
---------------------------------------

For these two:

* Add new input streams after a job has started.
* Stop consuming a stream.

It seems like either a StreamTask *or* a JobCoordinator might want to do this. Having some way to poll a system for new SystemStreams (or new SystemStreamPartitions for an existing SystemStream) based on a regex or something would be useful. A StreamTask might also want to add new input streams based on control messages, remote store state, etc.

Adding/removing a stream could be done by having the JobCoordinator write the task.inputs config into the coordinator stream with the new/old stream added or removed from the list.

Pausing can already be done today by having the MessageChooser decide to stop choosing messages for a given SSP. Exposing the MessageChooser via the SamzaContainerContext (or TaskContext) would allow StreamTasks to call arbitrary methods on the chooser, including "pause".

Overriding offsets can be achieved by writing checkpoint messages to the coordinator stream. It could also be achieved by making the SystemConsumer API mutable after start() is called, so a StreamTask could tell it to override its fetch offsets to some prior point. Another alternative would be to keep the SystemConsumer API immutable, and to fully rebuild a *new* SystemConsumer whenever an offset is changed for it. This seems a bit safer since it would discard any buffered messages that had already been consumed, but might be sitting in BlockingEnvelopeMap.

> Expose more consumer controls to developers
> -------------------------------------------
>
>                 Key: SAMZA-603
>                 URL: https://issues.apache.org/jira/browse/SAMZA-603
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.9.0
>            Reporter: Chris Riccomini
>             Fix For: 0.10.0
>
>
> Several overlapping feature requests have emerged. People want the ability to programmatically:
> # Add new input streams after a job has started.
> # Pause consuming a stream for a period of time.
> # Stop consuming a stream.
> # Override offsets at both job start time, and after a job has been running for a while (rewind/fast forward).
> We don't really expose any of these controls to the StreamTask or JobCoordinator in a pluggable way. This is an umbrella ticket to track this work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)