You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2013/08/20 00:31:48 UTC

[jira] [Created] (SAMZA-22) Flush producers and storage engines without committing

Chris Riccomini created SAMZA-22:
------------------------------------

             Summary: Flush producers and storage engines without committing
                 Key: SAMZA-22
                 URL: https://issues.apache.org/jira/browse/SAMZA-22
             Project: Samza
          Issue Type: Bug
    Affects Versions: 0.6.0
            Reporter: Chris Riccomini


Currently, the only way to trigger a commit call on SystemProducer or StorageEngine classes from within a StreamTask is to call TaskCoordinator.commit(), or to allow the task.commit.ms window to expire. The commit call (and task.commit.ms) also triggers a checkpoint, which might not be desired. We should add a way to call SystemProducer.commit and StorageEngine.commit without triggering a checkpoint.

The motivation for this request is that we have some Samza jobs that want to avoid introducing latency when processing messages, but also want to have a large producer batch size, so that the job can handle high throughput (bootstrapping from offset 0) scenarios as well. The options that this pattern have right now are:

1. Define a very low task.commit.ms (say 200ms).
2. Define a very low batch size (say 1).
3. Buffer outgoing messages inside StreamTask, disable task.commit.ms, and use task.window.ms to manage commit and flush intervals separately.

The problem with solution 1 is that this results in checkpointing each partition once every 200ms. Checkpointing can be a slower operation, which means it can introduce significant lag. Also, since checkpointing happens in the main loop, this is a blocking operation that prevents any messages from being processed until the checkpoint completes. Consider the case where a SamzaContainer is running with 20 partitions, a 200ms checkpoint, and a KafkaCheckpointManager. In such a case, you will be doing 20 sync sends to a Kafka topic with RF 3 and acks -1. This is really slow, since you have to wait for all replicas to read the message, which can be 10s of milliseconds, at least. Furthermore, if TaskCoordinator.commit was used instead of task.commit.ms, then all partitions will be flushed if any StreamTask calls commit, which leads to 400 commits (if you are trying to maintain a 200ms max-latency).

The problem with solution 2 is setting a small batch size results in much slower bootstraps.

The problem with solution 3 is that it's a poor user experience.

Two potential solutions:

1. TaskCoordinator API

One solution would be to add TaskCoordinator.flushState and TaskCoordinator.flushProducers, and let StreamTasks know whether they're bootstrapping or "caught up". Then, StreamTasks could call flushProducers (or flushState) in cases where they're caught up, but not aggressively in cases where they're bootstrapping.

2. Dynamic Flushing

A different solution would be to make the SamzaContainer try and handle this dynamically. The way it could do this is by triggering a commit on all producers and storage engines whenever "idle" time occurs. I define idle time as the first null message returned by SystemConsumers.choose after a non-null message was returned.

In the bootstrap case, you will always have a series of incoming messages with no null messages (unless the MessageChooser is doing something funky), which means that you will never have "idle" time. This is good because when you're bootstrapping, it means SystemProducers will be flushed at batch boundaries and StorageEngines will be flushed according to task.commit.ms.

In the non-bootstrap/steady state/caught up case, where you care about latency, you will periodically receive null messages when you poll SystemConsumers, and no new messages are available. On these boundaries, SamzaContainer can flush all StorageEngines and SystemProducers. This has a really nice side effect for low volume topics, which is that if StreamTask.process is called, and produces some output, if the next SystemConsumers.poll returns null, your messages will be immediately flushed. No latency would be introduced in this case.

I think dynamic flushing is a better solution because it doesn't require API changes (doesn't require the user to do anything), and I think it should result in optimal behavior in both bootstrap and steady state cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira