You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2014/04/30 19:12:15 UTC

[jira] [Commented] (SAMZA-253) Consensus shutdown API

    [ https://issues.apache.org/jira/browse/SAMZA-253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13985757#comment-13985757 ] 

Chris Riccomini commented on SAMZA-253:
---------------------------------------

1. The TaskCoordinators/TaskInstance setup still bugs me a bit. The reason it bugs me is that we've made the coordinator mutable, and live beyond a single process loop, but have kept the APIs the same.

What do you think about just giving the TaskInstanceCoordinator as a parameter in TaskInstance's constructor, and eliminating the (..., coordinator: TaskCoordinators) parameter from all methods in SamzaContainer/TaskInstance? The SamzaContainer can still use the TaskCoordinators to check commit/shutdown, and reset.

The reason I like this approach a bit more is because I think it fits a bit better with how the coordinator works now. Before, we had to pass the coordinator into every method because it's lifecycle only extended to one process() invocation. Without this, it seems better to just give it to the TaskInstance once.

2. One other thing to think through is that having the coordinator live beyond a single process() call means that a StreamTask can hold on to it and potentially do weird things like manipulate it from another thread. In general, we haven't allows this, which is why the ReadableCoordinator only lived for one process-loop. The only place where we deviate from this (that I recall) is with the MetricsRegistry in the TaskContext. In a sense, this is the same problem that I describe in (1), above. We are continuing to tread the coordinator as a single-process-loop object, when it's not anymore.

What do you think about keeping ReadableCoordinator the way it is (GC'd at end of process()-loop), and having a separate shutdown object that the SamzaCoordinator updates? (I realize this is a different approach than what I was saying in (1) above, but bear with me.) This approach would look at the coordinator's shutdown invocation (SHUTDOWN_NOW, or WAIT_FOR_ALL_TASKS), and update the shutdown object accordingly. The SamzaContainer would then check the shutdown object to see if it should shutdown. I kind of like this approach because it keeps the ReadableCoordinator more isolated, which I worry about when giving objects to StreamTasks.

3. Also, I'm a little worried about performance here. We're adding three new loops per-process loop. These loops have proven to be slow (especially the Scala loops). I think this is OK, though, since we're going to have to tackle the loop-performance issue holistically in the SamzaContainer anyway.

Taking the approach I outline in (2) would eliminate some of these loops because you could simply discard the process()-local coordinator, rather than iterating over all of them to reset them.

> Consensus shutdown API
> ----------------------
>
>                 Key: SAMZA-253
>                 URL: https://issues.apache.org/jira/browse/SAMZA-253
>             Project: Samza
>          Issue Type: New Feature
>            Reporter: Martin Kleppmann
>            Assignee: Martin Kleppmann
>             Fix For: 0.7.0
>
>         Attachments: SAMZA-253.1.patch
>
>
> Extracted from SAMZA-179.
> At the moment, TaskCoordinator.shutdown() can be called by any task, and immediately shuts down the container as soon as the current message has finished processing. That is appropriate in some cases, but not always. Sometimes, what we actually want is for each task to vote that it's ready to shut down, and for the container to be shut down when all tasks within that container have voted.
> A first implementation of this is on https://reviews.apache.org/r/19384/ but it's mixed up with several other concerns. This ticket is to extract only the shutdown API changes (TaskCoordinator.ShutdownMethod) from that patch, and to address the relevant comments made on that RB.



--
This message was sent by Atlassian JIRA
(v6.2#6252)