You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Lukasz Druminski <lu...@allegrogroup.com> on 2016/11/08 16:23:33 UTC

Protecting kafka-producer against unavailability of all brokers (request.timeout.ms)

Hi,

We are using kafka-producer 0.8.2 on our production. We configured it with
retries to Integer.MAX_VALUE and buffer.memory to 1GB.
Thanks to this setup we are protected from unavailability of all brokers
for around one hour (taking into account our production traffic).
For example, when all brokers from a single DC/zone are down,
kafka-producer buffers all incoming messages in its accumulator until full.
When brokers are available again, the producer sends all the buffered
messages to kafka. Thanks to this we have some time for recovery and don't
loose messages at all.

Now, we would like to migrate to the newest kafka-producer 0.10.1 but we
have a problem with preserving described behaviour because of changes
introduced to producer library:

- proposal about adding request timeout to NetworkClient
https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient
- producer record can stay in RecordAccumulator forever if leader is not
available https://issues.apache.org/jira/browse/KAFKA-1788
- add a request timeout to NetworkClient
https://issues.apache.org/jira/browse/KAFKA-2120

These changes provide request.timeout.ms parameter which is used in:

1. actual network RTT
2. server replication time
3. new mechanism for aborting expired batches

When brokers are unavailable for more than request.timeout.ms then
kafka-producer starts dropping batches from accumulator with a
TimeoutException in a callback with a message:

  "Batch containing " + recordCount + " record(s) expired due to timeout
while requesting metadata from brokers for " + topicPartition

As a possible solution, to protect against unavailability of all brokers,
in the newest kafka-producer:

- I could increase request.timeout.ms to one hour and batches would be
dropped after that time but this value is not reasonable for (1) and (2)
- I could catch TimeoutException and send corresponding message to
kafka-producer again but then I don’t have guarantee that there will be
free space in accumulator

In my opinion timeout for (3) should be independent from (1) and (2), or
dropping expired batches should be an optional feature.
What do you think about this issue? Do you have any suggestion/solution for
this use case?

Best regards,
Luke Druminski

Re: Protecting kafka-producer against unavailability of all brokers (request.timeout.ms)

Posted by su...@gmail.com.
I agree that accumulator timeout should be independent from the other two you mentioned. We at LinkedIn have come up with a solution and I'll create a KIP for it soon. In essence, we want batch.expiry.ms configuration that directly specifies accumulator timeout separately from request.timeout. Proliferation of request.timeout "up" the stack has been painful. There are a number of nuances to it and they will be discussed in the KIP. Stay tuned. 

-Sumant

Sent from my iPad

> On Nov 8, 2016, at 8:23 AM, Lukasz Druminski <lu...@allegrogroup.com> wrote:
> 
> Hi,
> 
> We are using kafka-producer 0.8.2 on our production. We configured it with
> retries to Integer.MAX_VALUE and buffer.memory to 1GB.
> Thanks to this setup we are protected from unavailability of all brokers
> for around one hour (taking into account our production traffic).
> For example, when all brokers from a single DC/zone are down,
> kafka-producer buffers all incoming messages in its accumulator until full.
> When brokers are available again, the producer sends all the buffered
> messages to kafka. Thanks to this we have some time for recovery and don't
> loose messages at all.
> 
> Now, we would like to migrate to the newest kafka-producer 0.10.1 but we
> have a problem with preserving described behaviour because of changes
> introduced to producer library:
> 
> - proposal about adding request timeout to NetworkClient
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient
> - producer record can stay in RecordAccumulator forever if leader is not
> available https://issues.apache.org/jira/browse/KAFKA-1788
> - add a request timeout to NetworkClient
> https://issues.apache.org/jira/browse/KAFKA-2120
> 
> These changes provide request.timeout.ms parameter which is used in:
> 
> 1. actual network RTT
> 2. server replication time
> 3. new mechanism for aborting expired batches
> 
> When brokers are unavailable for more than request.timeout.ms then
> kafka-producer starts dropping batches from accumulator with a
> TimeoutException in a callback with a message:
> 
>  "Batch containing " + recordCount + " record(s) expired due to timeout
> while requesting metadata from brokers for " + topicPartition
> 
> As a possible solution, to protect against unavailability of all brokers,
> in the newest kafka-producer:
> 
> - I could increase request.timeout.ms to one hour and batches would be
> dropped after that time but this value is not reasonable for (1) and (2)
> - I could catch TimeoutException and send corresponding message to
> kafka-producer again but then I don’t have guarantee that there will be
> free space in accumulator
> 
> In my opinion timeout for (3) should be independent from (1) and (2), or
> dropping expired batches should be an optional feature.
> What do you think about this issue? Do you have any suggestion/solution for
> this use case?
> 
> Best regards,
> Luke Druminski