You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Jake Maes (JIRA)" <ji...@apache.org> on 2017/09/20 18:12:00 UTC

[jira] [Updated] (SAMZA-1392) KafkaSystemProducer performance and correctness with concurrent sends and flushes

     [ https://issues.apache.org/jira/browse/SAMZA-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jake Maes updated SAMZA-1392:
-----------------------------
    Attachment: Producer Performance Tests for SAMZA-1392 - Sheet1.pdf

Attaching some performance test results for the patch. 

The results look pretty good across the board. As expected, the biggest improvement was with compression and multithreading enabled, but it's slightly faster all around. The fix to the flush logic (first bullet point in the list below) should have slowed us down a bit, so that must have been offset by the other improvements in the change. 

I’m a little snowblind from looking at the spreadsheet for the past few days, so if you spot any anomalies, let me know. :-)

Background:
In short, we wanted to remove the sendLock that we use to get the latestFuture because the KafkaProducer does compression and serialization in the user thread, which causes contention on that lock when multithreading is enabled. 

Recall that we were tracking the latestFuture and waiting on it for flush(). That code was written at a time before Kafka's flush() was available/working. Since the Kafka flush() operation is now available in all the versions supported by Samza, the plan was to switch to that API. With that model, we can flush without tracking the latestFuture which means we can remove the sendLock. 

Along the way, I found and fixed a couple data loss issues which mostly stem from our synchronous SystemProducer API trying to handle asynchronous error scenarios with multithreading enabled.

Summary of the improvements:
* Fixed a bug in flush() logic that waits on the latest Future per task rather than per partition, which could lead to data loss.
* Fixed a bug with exception handling when task.async.commit=true that could cause data loss.
* Significantly improved send() performance with multithreading by removing the sendLock in favor of the KafkaProducer flush() method. (see performance results in google sheet)
* Improved availability guarantees by introducing task.drop.producer.errors, which guarantees that a container will never fail due to async producer errors. This fixes the case where the application wants to swallow producer exceptions, but can't do so when they occur in flush(), so they still get occasional container failures. 
* Code is much more simple now and therefore easier to maintain. 
* More thorough unit tests added. 
* Potentially better batching in low throughput scenarios now because linger.ms=10 by default.


> KafkaSystemProducer performance and correctness with concurrent sends and flushes
> ---------------------------------------------------------------------------------
>
>                 Key: SAMZA-1392
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1392
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Jake Maes
>            Assignee: Jake Maes
>             Fix For: 0.14.0
>
>         Attachments: Producer Performance Tests for SAMZA-1392 - Sheet1.pdf
>
>
> There are 2 issues we need to fix in the KafkaSystemProducer when sends and flushes are called concurrently:
> 1. Concurrent sends contend for the sendlock, especially when producer compression is enabled. The fix is to use the producer.flush() API, which kafka has supported since at least version 0.9.x. This way we won't need to track the latest future, so we won't need the lock.
> 2. When task.async.commit is enabled, the threads calling send() could set the exceptionInCallback to null before the exception is handled in user code or flush(). This could allow us to checkpoint offsets for which the corresponding output was not successfully sent.
> The short term solution here is to only handle the callback exceptions from flush() and allow users to configure the exceptions as ignorable in case they don't want flush to fail.
> The long term solution is to support a fully asynchronous SystemProducer. Ticket SAMZA-1393.
> I found issue #2 while working on issue #1, so while they're separate issues, it's easier to fix them with one ticket/patch.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)