You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Oleg Zhurakousky <oz...@hortonworks.com> on 2016/04/11 21:45:38 UTC

"Close the consumer, waiting indefinitely for any needed cleanup."

The subject line is from the javadoc of the new KafkaConsumer.
Is this for real? I mean I am hoping the use of ‘indefinitely' is a typo. 
In any event if it is indeed true, how does one break out of indefinitely blocking consumer.close() invocation?

Cheers
Oleg

Re: "Close the consumer, waiting indefinitely for any needed cleanup."

Posted by Dana Powers <da...@gmail.com>.
If you pay me, I might write the code for you, too ;)

-Dana

On Mon, Apr 11, 2016 at 1:34 PM, Oleg Zhurakousky
<oz...@hortonworks.com> wrote:
> Dana, I am sorry, but I can’t accept that as an answer.
> Regardless, the API exposed to the end user must never “block indefinitely”. And saying you have to move a few mountains to work around what most would perceive to be a design issue is not the acceptable answer.
> I’ll raise the JIRA
>
> Cheers
> Oleg
>
>> On Apr 11, 2016, at 4:25 PM, Dana Powers <da...@gmail.com> wrote:
>>
>> If you wanted to implement a timeout, you'd need to wire it up in
>> commitOffsetsSync and plumb the timeout from Coordinator.close() and
>> Consumer.close(). That's your answer. Code changes required.
>>
>> -Dana
>>
>> On Mon, Apr 11, 2016 at 1:17 PM, Oleg Zhurakousky
>> <oz...@hortonworks.com> wrote:
>>> Dana
>>> Everything your are saying does not answer my question of how to interrupt a potential deadlock artificially forced upon users of KafkaConsumer API.
>>> I may be OK with duplicate messages, I may be OK with data loss and I am OK with doing an extra work to do all kind of things. I am NOT OK with getting stuck ok close() call when I really want my system that uses KafkaConsumer to exit. So Consumer.close(timeout) is what I was really asking about.
>>> So, is there a way now to interrupt such block?
>>>
>>> Cheers
>>> Oleg
>>>
>>>> On Apr 11, 2016, at 4:08 PM, Dana Powers <da...@gmail.com> wrote:
>>>>
>>>> Not a typo. This happens because the consumer closes the coordinator,
>>>> and the coordinator attempts to commit any pending offsets
>>>> synchronously in order to avoid duplicate message delivery. The
>>>> Coordinator method commitOffsetsSync will retry indefinitely unless a
>>>> non-recoverable error is encountered. If you wanted to implement a
>>>> timeout, you'd need to wire it up in commitOffsetsSync and plumb the
>>>> timeout from Coordinator.close() and Consumer.close(). It doesn't look
>>>> terribly complicated, but you should check on the dev list for more
>>>> opinions.
>>>>
>>>> -Dana
>>>>
>>>> On Mon, Apr 11, 2016 at 12:45 PM, Oleg Zhurakousky
>>>> <oz...@hortonworks.com> wrote:
>>>>> The subject line is from the javadoc of the new KafkaConsumer.
>>>>> Is this for real? I mean I am hoping the use of ‘indefinitely' is a typo.
>>>>> In any event if it is indeed true, how does one break out of indefinitely blocking consumer.close() invocation?
>>>>>
>>>>> Cheers
>>>>> Oleg
>>>>
>>>
>>
>

Re: "Close the consumer, waiting indefinitely for any needed cleanup."

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Dana, I am sorry, but I can’t accept that as an answer. 
Regardless, the API exposed to the end user must never “block indefinitely”. And saying you have to move a few mountains to work around what most would perceive to be a design issue is not the acceptable answer.
I’ll raise the JIRA

Cheers
Oleg

> On Apr 11, 2016, at 4:25 PM, Dana Powers <da...@gmail.com> wrote:
> 
> If you wanted to implement a timeout, you'd need to wire it up in
> commitOffsetsSync and plumb the timeout from Coordinator.close() and
> Consumer.close(). That's your answer. Code changes required.
> 
> -Dana
> 
> On Mon, Apr 11, 2016 at 1:17 PM, Oleg Zhurakousky
> <oz...@hortonworks.com> wrote:
>> Dana
>> Everything your are saying does not answer my question of how to interrupt a potential deadlock artificially forced upon users of KafkaConsumer API.
>> I may be OK with duplicate messages, I may be OK with data loss and I am OK with doing an extra work to do all kind of things. I am NOT OK with getting stuck ok close() call when I really want my system that uses KafkaConsumer to exit. So Consumer.close(timeout) is what I was really asking about.
>> So, is there a way now to interrupt such block?
>> 
>> Cheers
>> Oleg
>> 
>>> On Apr 11, 2016, at 4:08 PM, Dana Powers <da...@gmail.com> wrote:
>>> 
>>> Not a typo. This happens because the consumer closes the coordinator,
>>> and the coordinator attempts to commit any pending offsets
>>> synchronously in order to avoid duplicate message delivery. The
>>> Coordinator method commitOffsetsSync will retry indefinitely unless a
>>> non-recoverable error is encountered. If you wanted to implement a
>>> timeout, you'd need to wire it up in commitOffsetsSync and plumb the
>>> timeout from Coordinator.close() and Consumer.close(). It doesn't look
>>> terribly complicated, but you should check on the dev list for more
>>> opinions.
>>> 
>>> -Dana
>>> 
>>> On Mon, Apr 11, 2016 at 12:45 PM, Oleg Zhurakousky
>>> <oz...@hortonworks.com> wrote:
>>>> The subject line is from the javadoc of the new KafkaConsumer.
>>>> Is this for real? I mean I am hoping the use of ‘indefinitely' is a typo.
>>>> In any event if it is indeed true, how does one break out of indefinitely blocking consumer.close() invocation?
>>>> 
>>>> Cheers
>>>> Oleg
>>> 
>> 
> 


Re: "Close the consumer, waiting indefinitely for any needed cleanup."

Posted by Dana Powers <da...@gmail.com>.
If you wanted to implement a timeout, you'd need to wire it up in
commitOffsetsSync and plumb the timeout from Coordinator.close() and
Consumer.close(). That's your answer. Code changes required.

-Dana

On Mon, Apr 11, 2016 at 1:17 PM, Oleg Zhurakousky
<oz...@hortonworks.com> wrote:
> Dana
> Everything your are saying does not answer my question of how to interrupt a potential deadlock artificially forced upon users of KafkaConsumer API.
> I may be OK with duplicate messages, I may be OK with data loss and I am OK with doing an extra work to do all kind of things. I am NOT OK with getting stuck ok close() call when I really want my system that uses KafkaConsumer to exit. So Consumer.close(timeout) is what I was really asking about.
> So, is there a way now to interrupt such block?
>
> Cheers
> Oleg
>
>> On Apr 11, 2016, at 4:08 PM, Dana Powers <da...@gmail.com> wrote:
>>
>> Not a typo. This happens because the consumer closes the coordinator,
>> and the coordinator attempts to commit any pending offsets
>> synchronously in order to avoid duplicate message delivery. The
>> Coordinator method commitOffsetsSync will retry indefinitely unless a
>> non-recoverable error is encountered. If you wanted to implement a
>> timeout, you'd need to wire it up in commitOffsetsSync and plumb the
>> timeout from Coordinator.close() and Consumer.close(). It doesn't look
>> terribly complicated, but you should check on the dev list for more
>> opinions.
>>
>> -Dana
>>
>> On Mon, Apr 11, 2016 at 12:45 PM, Oleg Zhurakousky
>> <oz...@hortonworks.com> wrote:
>>> The subject line is from the javadoc of the new KafkaConsumer.
>>> Is this for real? I mean I am hoping the use of ‘indefinitely' is a typo.
>>> In any event if it is indeed true, how does one break out of indefinitely blocking consumer.close() invocation?
>>>
>>> Cheers
>>> Oleg
>>
>

Re: "Close the consumer, waiting indefinitely for any needed cleanup."

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Dana
Everything your are saying does not answer my question of how to interrupt a potential deadlock artificially forced upon users of KafkaConsumer API.
I may be OK with duplicate messages, I may be OK with data loss and I am OK with doing an extra work to do all kind of things. I am NOT OK with getting stuck ok close() call when I really want my system that uses KafkaConsumer to exit. So Consumer.close(timeout) is what I was really asking about. 
So, is there a way now to interrupt such block? 

Cheers
Oleg

> On Apr 11, 2016, at 4:08 PM, Dana Powers <da...@gmail.com> wrote:
> 
> Not a typo. This happens because the consumer closes the coordinator,
> and the coordinator attempts to commit any pending offsets
> synchronously in order to avoid duplicate message delivery. The
> Coordinator method commitOffsetsSync will retry indefinitely unless a
> non-recoverable error is encountered. If you wanted to implement a
> timeout, you'd need to wire it up in commitOffsetsSync and plumb the
> timeout from Coordinator.close() and Consumer.close(). It doesn't look
> terribly complicated, but you should check on the dev list for more
> opinions.
> 
> -Dana
> 
> On Mon, Apr 11, 2016 at 12:45 PM, Oleg Zhurakousky
> <oz...@hortonworks.com> wrote:
>> The subject line is from the javadoc of the new KafkaConsumer.
>> Is this for real? I mean I am hoping the use of ‘indefinitely' is a typo.
>> In any event if it is indeed true, how does one break out of indefinitely blocking consumer.close() invocation?
>> 
>> Cheers
>> Oleg
> 


Re: "Close the consumer, waiting indefinitely for any needed cleanup."

Posted by Dana Powers <da...@gmail.com>.
Not a typo. This happens because the consumer closes the coordinator,
and the coordinator attempts to commit any pending offsets
synchronously in order to avoid duplicate message delivery. The
Coordinator method commitOffsetsSync will retry indefinitely unless a
non-recoverable error is encountered. If you wanted to implement a
timeout, you'd need to wire it up in commitOffsetsSync and plumb the
timeout from Coordinator.close() and Consumer.close(). It doesn't look
terribly complicated, but you should check on the dev list for more
opinions.

-Dana

On Mon, Apr 11, 2016 at 12:45 PM, Oleg Zhurakousky
<oz...@hortonworks.com> wrote:
> The subject line is from the javadoc of the new KafkaConsumer.
> Is this for real? I mean I am hoping the use of ‘indefinitely' is a typo.
> In any event if it is indeed true, how does one break out of indefinitely blocking consumer.close() invocation?
>
> Cheers
> Oleg