You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Samuel Chase <sa...@gmail.com> on 2015/03/20 08:01:41 UTC

New Java Producer Client handling case where Kafka is unreachable

Hello Everyone,

In the the new Java Producer API, the Callback code in
KafkaProducer.send is run after there is a response from the Kafka
server. This can be used if some error handling needs to be done based
on the response.

When using the new Java Kafka Producer, I've noticed that when the
Kafka server is down/unreachable, KafkaProducer.send blocks until the
Kafka server is back up again.

We've been using the older Scala Producer and when Kafka is
unreachable it throws an exception after a few retries. This exception
is caught and then some error handling code is run.

- What is the recommended way of using the new Java Producer API to
handle the case where Kafka is unreachable temporarily?

I don't want to wait until it is reachable again before I know that
the send failed.

Any help, advice shall be much appreciated.

Thanks,

Samuel

Re: New Java Producer Client handling case where Kafka is unreachable

Posted by Samuel Chase <sa...@gmail.com>.
On Fri, Mar 20, 2015 at 3:24 PM, tao xiao <xi...@gmail.com> wrote:
> The underlining send runs in a different thread and doesn't block
> producer.send(). One way I can think of to detect this is to set
> block.on.buffer.full=false and catch BufferExhaustedException then check if
> the broker is reachable. But this is an hacky way as BufferExhaustedException

What is the correct method to detect whether the broker is reachable?

Re: New Java Producer Client handling case where Kafka is unreachable

Posted by tao xiao <xi...@gmail.com>.
The underlining send runs in a different thread and doesn't block
producer.send(). One way I can think of to detect this is to set
block.on.buffer.full=false and catch BufferExhaustedException then check if
the broker is reachable. But this is an hacky way as BufferExhaustedException
may indicate other errors like buffer memory not enough.

On Fri, Mar 20, 2015 at 5:12 PM, Samuel Chase <sa...@gmail.com> wrote:

> @Tao,
>
> On Fri, Mar 20, 2015 at 12:39 PM, tao xiao <xi...@gmail.com> wrote:
> > You can set producer property retries not equal to 0. Details can be
> found
> > here
> > http://kafka.apache.org/documentation.html#newproducerconfigs
>
> I set "retries" to "1", but send is still blocking until the Kafka
> Server is reachable again.
>
> @Everyone,
>
> After successful initialization of the KafkaProducer, If Kafka becomes
> unreachable for a short while, any further sends wait indefinitely
> (when calling .get() on the future returned by .send()). Is there any
> way of detecting that Kafka is unreachable?
>
> Samuel
>



-- 
Regards,
Tao

Re: New Java Producer Client handling case where Kafka is unreachable

Posted by Samuel Chase <sa...@gmail.com>.
@Mayuresh

On Fri, Mar 20, 2015 at 10:30 PM, Mayuresh Gharat
<gh...@gmail.com> wrote:
> But if the entire kafka cluster is down, it would try for some configured
> number of retries and would return back an error in future. This is my
> understanding. Please correct me if I am wrong.

When I set the "retries" option to 1, and "metadata.fetch.timeout.ms"
set to 1000ms, I still noticed that it was blocking indefinitely. The
future only errors out /after/ the cluster is back up again.

Re: New Java Producer Client handling case where Kafka is unreachable

Posted by Mayuresh Gharat <gh...@gmail.com>.
I think if the leader is down, it a leader is down, the producer would
issue a metadata request and get the new leader and start producing to it.
But if the entire kafka cluster is down, it would try for some configured
number of retries and would return back an error in future. This is my
understanding. Please correct me if I am wrong.

Thanks,

Mayuresh

On Fri, Mar 20, 2015 at 9:34 AM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> Even if you have metadata cached, if the broker isn't available then
> messages can get stuck in the producer indefinitely. Currently the new
> producer doesn't have any client-side timeouts, which is a bug. See
> https://issues.apache.org/jira/browse/KAFKA-1788 for more details.
>
>
> On Fri, Mar 20, 2015 at 8:09 AM, Jiangjie Qin <jq...@linkedin.com.invalid>
> wrote:
>
> > This is correct when you send to a topic for the first time. After that
> > the metadata will be cached, the metadata cache has an age and after it
> > expires, metadata will be refreshed.
> > So the time a producer found a broker is not reachable is the minimum
> > value of the following times:
> > 1. Linger.ms + retries * retry.backoff.ms
> > 2. Metadata.max.age.ms
> > 3. Metadata.fetch.timeout.ms (only when sending to a topic for the first
> > time)
> >
> > Typically you will hit the first one. The default value is linger.ms=0,
> > retries=0. But you need to send records with callback to detect the
> > failure.
> >
> > Jiangjie (Becket) Qin
> >
> > On 3/20/15, 3:46 AM, "Samuel Chase" <sa...@gmail.com> wrote:
> >
> > >@Sunil
> > >
> > >The else branch will be executed if
> > >`metadata.fetch().partitionsForTopic(topic)` returns non NULL value. I
> > >assume that when Kafka is unreachable, it will return NULL.
> > >`waitOnMetadata()` then returns; we never enter the else branch when
> > >Kafka is unreachable.
> > >
> > >@Everyone: Is this explanation correct?
> > >
> > >On Fri, Mar 20, 2015 at 3:56 PM, sunil kalva <sa...@gmail.com>
> > wrote:
> > >> @Samuel
> > >> My point was
> > >> The else branch of the code will be executed when metadata is not
> > >> available, and metadata is not available when kafka cluster is not
> > >>rachable.
> > >>
> > >> please correct me if i am wrong..
> > >>
> > >> On Fri, Mar 20, 2015 at 3:43 PM, Samuel Chase <sa...@gmail.com>
> > >>wrote:
> > >>
> > >>> @Sunil
> > >>>
> > >>> On Fri, Mar 20, 2015 at 3:36 PM, sunil kalva <sa...@gmail.com>
> > >>>wrote:
> > >>> > I think KafkaProducer.send method blocks until it fetches partition
> > >>> > metadata for configured time using "metadata.fetch.timeout.ms",
> once
> > >>> time
> > >>> > out it throws TimeoutException. You might be experiencing
> > >>> TimeoutException ?
> > >>>
> > >>> My co-worker pointed out that over here:
> > >>>
> > >>>
> > >>>
> > https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apa
> > >>>che/kafka/clients/producer/KafkaProducer.java#L368
> > >>>
> > >>> waitOnMetadata just returns. The else branch of the code is not
> > >>> executed when Kafka is unreachable.
> > >>>
> > >>> Trying to investigate what else must be causing the wait.
> > >>>
> > >>
> > >>
> > >>
> > >> --
> > >> SunilKalva
> >
> >
>
>
> --
> Thanks,
> Ewen
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: New Java Producer Client handling case where Kafka is unreachable

Posted by Samuel Chase <sa...@gmail.com>.
@Ewen

On Fri, Mar 20, 2015 at 10:04 PM, Ewen Cheslack-Postava
<ew...@confluent.io> wrote:
> Even if you have metadata cached, if the broker isn't available then
> messages can get stuck in the producer indefinitely. Currently the new
> producer doesn't have any client-side timeouts, which is a bug. See
> https://issues.apache.org/jira/browse/KAFKA-1788 for more details.

Thanks! That clears it then.

Client-side timeouts are what are required. Will read up on the ticket...

Samuel

Re: New Java Producer Client handling case where Kafka is unreachable

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Even if you have metadata cached, if the broker isn't available then
messages can get stuck in the producer indefinitely. Currently the new
producer doesn't have any client-side timeouts, which is a bug. See
https://issues.apache.org/jira/browse/KAFKA-1788 for more details.


On Fri, Mar 20, 2015 at 8:09 AM, Jiangjie Qin <jq...@linkedin.com.invalid>
wrote:

> This is correct when you send to a topic for the first time. After that
> the metadata will be cached, the metadata cache has an age and after it
> expires, metadata will be refreshed.
> So the time a producer found a broker is not reachable is the minimum
> value of the following times:
> 1. Linger.ms + retries * retry.backoff.ms
> 2. Metadata.max.age.ms
> 3. Metadata.fetch.timeout.ms (only when sending to a topic for the first
> time)
>
> Typically you will hit the first one. The default value is linger.ms=0,
> retries=0. But you need to send records with callback to detect the
> failure.
>
> Jiangjie (Becket) Qin
>
> On 3/20/15, 3:46 AM, "Samuel Chase" <sa...@gmail.com> wrote:
>
> >@Sunil
> >
> >The else branch will be executed if
> >`metadata.fetch().partitionsForTopic(topic)` returns non NULL value. I
> >assume that when Kafka is unreachable, it will return NULL.
> >`waitOnMetadata()` then returns; we never enter the else branch when
> >Kafka is unreachable.
> >
> >@Everyone: Is this explanation correct?
> >
> >On Fri, Mar 20, 2015 at 3:56 PM, sunil kalva <sa...@gmail.com>
> wrote:
> >> @Samuel
> >> My point was
> >> The else branch of the code will be executed when metadata is not
> >> available, and metadata is not available when kafka cluster is not
> >>rachable.
> >>
> >> please correct me if i am wrong..
> >>
> >> On Fri, Mar 20, 2015 at 3:43 PM, Samuel Chase <sa...@gmail.com>
> >>wrote:
> >>
> >>> @Sunil
> >>>
> >>> On Fri, Mar 20, 2015 at 3:36 PM, sunil kalva <sa...@gmail.com>
> >>>wrote:
> >>> > I think KafkaProducer.send method blocks until it fetches partition
> >>> > metadata for configured time using "metadata.fetch.timeout.ms", once
> >>> time
> >>> > out it throws TimeoutException. You might be experiencing
> >>> TimeoutException ?
> >>>
> >>> My co-worker pointed out that over here:
> >>>
> >>>
> >>>
> https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apa
> >>>che/kafka/clients/producer/KafkaProducer.java#L368
> >>>
> >>> waitOnMetadata just returns. The else branch of the code is not
> >>> executed when Kafka is unreachable.
> >>>
> >>> Trying to investigate what else must be causing the wait.
> >>>
> >>
> >>
> >>
> >> --
> >> SunilKalva
>
>


-- 
Thanks,
Ewen

Re: New Java Producer Client handling case where Kafka is unreachable

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
This is correct when you send to a topic for the first time. After that
the metadata will be cached, the metadata cache has an age and after it
expires, metadata will be refreshed.
So the time a producer found a broker is not reachable is the minimum
value of the following times:
1. Linger.ms + retries * retry.backoff.ms
2. Metadata.max.age.ms
3. Metadata.fetch.timeout.ms (only when sending to a topic for the first
time)

Typically you will hit the first one. The default value is linger.ms=0,
retries=0. But you need to send records with callback to detect the
failure.

Jiangjie (Becket) Qin

On 3/20/15, 3:46 AM, "Samuel Chase" <sa...@gmail.com> wrote:

>@Sunil
>
>The else branch will be executed if
>`metadata.fetch().partitionsForTopic(topic)` returns non NULL value. I
>assume that when Kafka is unreachable, it will return NULL.
>`waitOnMetadata()` then returns; we never enter the else branch when
>Kafka is unreachable.
>
>@Everyone: Is this explanation correct?
>
>On Fri, Mar 20, 2015 at 3:56 PM, sunil kalva <sa...@gmail.com> wrote:
>> @Samuel
>> My point was
>> The else branch of the code will be executed when metadata is not
>> available, and metadata is not available when kafka cluster is not
>>rachable.
>>
>> please correct me if i am wrong..
>>
>> On Fri, Mar 20, 2015 at 3:43 PM, Samuel Chase <sa...@gmail.com>
>>wrote:
>>
>>> @Sunil
>>>
>>> On Fri, Mar 20, 2015 at 3:36 PM, sunil kalva <sa...@gmail.com>
>>>wrote:
>>> > I think KafkaProducer.send method blocks until it fetches partition
>>> > metadata for configured time using "metadata.fetch.timeout.ms", once
>>> time
>>> > out it throws TimeoutException. You might be experiencing
>>> TimeoutException ?
>>>
>>> My co-worker pointed out that over here:
>>>
>>> 
>>>https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apa
>>>che/kafka/clients/producer/KafkaProducer.java#L368
>>>
>>> waitOnMetadata just returns. The else branch of the code is not
>>> executed when Kafka is unreachable.
>>>
>>> Trying to investigate what else must be causing the wait.
>>>
>>
>>
>>
>> --
>> SunilKalva


Re: New Java Producer Client handling case where Kafka is unreachable

Posted by Samuel Chase <sa...@gmail.com>.
@Sunil

The else branch will be executed if
`metadata.fetch().partitionsForTopic(topic)` returns non NULL value. I
assume that when Kafka is unreachable, it will return NULL.
`waitOnMetadata()` then returns; we never enter the else branch when
Kafka is unreachable.

@Everyone: Is this explanation correct?

On Fri, Mar 20, 2015 at 3:56 PM, sunil kalva <sa...@gmail.com> wrote:
> @Samuel
> My point was
> The else branch of the code will be executed when metadata is not
> available, and metadata is not available when kafka cluster is not rachable.
>
> please correct me if i am wrong..
>
> On Fri, Mar 20, 2015 at 3:43 PM, Samuel Chase <sa...@gmail.com> wrote:
>
>> @Sunil
>>
>> On Fri, Mar 20, 2015 at 3:36 PM, sunil kalva <sa...@gmail.com> wrote:
>> > I think KafkaProducer.send method blocks until it fetches partition
>> > metadata for configured time using "metadata.fetch.timeout.ms", once
>> time
>> > out it throws TimeoutException. You might be experiencing
>> TimeoutException ?
>>
>> My co-worker pointed out that over here:
>>
>> https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L368
>>
>> waitOnMetadata just returns. The else branch of the code is not
>> executed when Kafka is unreachable.
>>
>> Trying to investigate what else must be causing the wait.
>>
>
>
>
> --
> SunilKalva

Re: New Java Producer Client handling case where Kafka is unreachable

Posted by sunil kalva <sa...@gmail.com>.
@Samuel
My point was
The else branch of the code will be executed when metadata is not
available, and metadata is not available when kafka cluster is not rachable.

please correct me if i am wrong..

On Fri, Mar 20, 2015 at 3:43 PM, Samuel Chase <sa...@gmail.com> wrote:

> @Sunil
>
> On Fri, Mar 20, 2015 at 3:36 PM, sunil kalva <sa...@gmail.com> wrote:
> > I think KafkaProducer.send method blocks until it fetches partition
> > metadata for configured time using "metadata.fetch.timeout.ms", once
> time
> > out it throws TimeoutException. You might be experiencing
> TimeoutException ?
>
> My co-worker pointed out that over here:
>
> https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L368
>
> waitOnMetadata just returns. The else branch of the code is not
> executed when Kafka is unreachable.
>
> Trying to investigate what else must be causing the wait.
>



-- 
SunilKalva

Re: New Java Producer Client handling case where Kafka is unreachable

Posted by Samuel Chase <sa...@gmail.com>.
@Sunil

On Fri, Mar 20, 2015 at 3:36 PM, sunil kalva <sa...@gmail.com> wrote:
> I think KafkaProducer.send method blocks until it fetches partition
> metadata for configured time using "metadata.fetch.timeout.ms", once time
> out it throws TimeoutException. You might be experiencing TimeoutException ?

My co-worker pointed out that over here:
https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L368

waitOnMetadata just returns. The else branch of the code is not
executed when Kafka is unreachable.

Trying to investigate what else must be causing the wait.

Re: New Java Producer Client handling case where Kafka is unreachable

Posted by sunil kalva <sa...@gmail.com>.
I think KafkaProducer.send method blocks until it fetches partition
metadata for configured time using "metadata.fetch.timeout.ms", once time
out it throws TimeoutException. You might be experiencing TimeoutException ?

ref: KafkaProducer.java(waitOnMetadata)

On Fri, Mar 20, 2015 at 2:42 PM, Samuel Chase <sa...@gmail.com> wrote:

> @Tao,
>
> On Fri, Mar 20, 2015 at 12:39 PM, tao xiao <xi...@gmail.com> wrote:
> > You can set producer property retries not equal to 0. Details can be
> found
> > here
> > http://kafka.apache.org/documentation.html#newproducerconfigs
>
> I set "retries" to "1", but send is still blocking until the Kafka
> Server is reachable again.
>
> @Everyone,
>
> After successful initialization of the KafkaProducer, If Kafka becomes
> unreachable for a short while, any further sends wait indefinitely
> (when calling .get() on the future returned by .send()). Is there any
> way of detecting that Kafka is unreachable?
>
> Samuel
>



-- 
SunilKalva

Re: New Java Producer Client handling case where Kafka is unreachable

Posted by Samuel Chase <sa...@gmail.com>.
@Tao,

On Fri, Mar 20, 2015 at 12:39 PM, tao xiao <xi...@gmail.com> wrote:
> You can set producer property retries not equal to 0. Details can be found
> here
> http://kafka.apache.org/documentation.html#newproducerconfigs

I set "retries" to "1", but send is still blocking until the Kafka
Server is reachable again.

@Everyone,

After successful initialization of the KafkaProducer, If Kafka becomes
unreachable for a short while, any further sends wait indefinitely
(when calling .get() on the future returned by .send()). Is there any
way of detecting that Kafka is unreachable?

Samuel

Re: New Java Producer Client handling case where Kafka is unreachable

Posted by Samuel Chase <sa...@gmail.com>.
Hello Tao,

On Fri, Mar 20, 2015 at 12:39 PM, tao xiao <xi...@gmail.com> wrote:
> You can set producer property retries not equal to 0. Details can be found
> here
> http://kafka.apache.org/documentation.html#newproducerconfigs

Thanks! I shall try that.

Samuel

Re: New Java Producer Client handling case where Kafka is unreachable

Posted by tao xiao <xi...@gmail.com>.
You can set producer property retries not equal to 0. Details can be found
here
http://kafka.apache.org/documentation.html#newproducerconfigs

On Fri, Mar 20, 2015 at 3:01 PM, Samuel Chase <sa...@gmail.com> wrote:

> Hello Everyone,
>
> In the the new Java Producer API, the Callback code in
> KafkaProducer.send is run after there is a response from the Kafka
> server. This can be used if some error handling needs to be done based
> on the response.
>
> When using the new Java Kafka Producer, I've noticed that when the
> Kafka server is down/unreachable, KafkaProducer.send blocks until the
> Kafka server is back up again.
>
> We've been using the older Scala Producer and when Kafka is
> unreachable it throws an exception after a few retries. This exception
> is caught and then some error handling code is run.
>
> - What is the recommended way of using the new Java Producer API to
> handle the case where Kafka is unreachable temporarily?
>
> I don't want to wait until it is reachable again before I know that
> the send failed.
>
> Any help, advice shall be much appreciated.
>
> Thanks,
>
> Samuel
>



-- 
Regards,
Tao