You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Martin Kleppmann (JIRA)" <ji...@apache.org> on 2014/03/19 00:42:51 UTC

[jira] [Updated] (SAMZA-179) Support a way to shutdown when a task is caught up

     [ https://issues.apache.org/jira/browse/SAMZA-179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Martin Kleppmann updated SAMZA-179:
-----------------------------------

    Attachment: SAMZA-179-v1.patch

Here is a first attempt -- I expect it will need some refinement: https://reviews.apache.org/r/19384/

You can use it like this:

{code:java}
public class FooTask implements StreamTask, InitableTask {
  private String newestOffset;

  public void init(Config config, TaskContext context) {
    this.newestOffset = context.getStreamMetadata(new SystemStream("kafka", "input-topic")).getNewestOffset();
  }

  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
    if (envelope.getOffset().equals(newestOffset)) {
      coordinator.shutdown(false);
    }
  }
}
{code}

I thought the approach of exposing stream metadata to the task was more flexible than just a notification when it has caught up, and doesn't really make the task any more complex.

Some discussion points:

* Should TaskContext.getStreamMetadata return the current metadata from the broker, or just the version of the metadata that was fetched when the job was initialized? BootstrappingChooser just uses the version from job startup, but I thought that people might use the stream metadata for other things too, and it would be really confusing if it's not up-to-date.
* Is it ok for TaskContext.getStreamMetadata to just call straight through to SystemAdmin.getSystemStreamMetadata? It's not ideal, as TaskContext.getStreamMetadata gets called for each task — however, thanks to TopicMetadataCache, we don't actually contact the broker for each task.
* I agree that shutdown should wait for all tasks to be caught up, so I've added a boolean parameter to the shutdown method. force=true keeps the current semantics, force=false waits until all tasks have requested shutdown before actually doing it. A task may continue processing messages, even after it has requested shutdown (can't really do anything else -- can't throw them away!).

Happy to make changes if you disagree with those choices. What do you think?

> Support a way to shutdown when a task is caught up
> --------------------------------------------------
>
>                 Key: SAMZA-179
>                 URL: https://issues.apache.org/jira/browse/SAMZA-179
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Martin Kleppmann
>         Attachments: SAMZA-179-v1.patch
>
>
> There is currently no easy way for a task to process all messages from some offset all the way to the head of a stream, and then shutdown. This behavior is useful in cases where a Samza job is re-processing old data, or for Samza jobs that only run periodically.
> Now that we have SAMZA-147 merged in, SamzaContainer has access to the stream metadata required to do these operations. We should expose this to the tasks in some way, so they can decide when to shut down.
> Two ways that I can think of doing this are:
> 1. Add some sort of mix-in interface (a la InitableTask, ClosableTask, etc) that has a callback that's triggered whenever the last message processed by a task is equal to metadata.newest for the SystemStreamPartition.
> 2. Expose the SystemStreamMetadata information through the InitableTask.init's TaskContext object.
> I favor (2) right now, but we should dig into the code and think through other potential solutions.
> Also, something else to consider is that TaskCoordinator.shutdown currently immediately shuts down the task. There is an argument to be made that it should just mean "cease to process any messages for this task instance", and SamzaContainer should only be shutdown when ALL task instances have called coordinator.shutdown (or there is an error). This would be useful in situations where a StreamTask wishes to shutdown when it's fully caught up to head across all partitions. Right now, the first task instance to call shutdown (the first one caught up) would shut down the entire SamzaContainer. This would shut down partitions that might not be caught up yet.



--
This message was sent by Atlassian JIRA
(v6.2#6252)