You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Oleg Zhurakousky <oz...@hortonworks.com> on 2016/04/07 15:04:49 UTC

KafkaProducer block on send

I know it’s been discussed before, but that conversation never really concluded with any reasonable explanation, so I am bringing it up again as I believe this is a bug that would need to be fixed in some future release.
Can someone please explain the rational for the following code in KafkaProducer:

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        try {
            // first make sure the metadata for the topic is available
            long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
. . .
}

By definition the method that returns Future implies that caller decides how long to wait for the completion via Future.get(TIMETOWAIT). In this case there is an explicit blocking call (waitOnMetadata), that can hang infinitely (regardless of the reasons) which essentially results in user’s code deadlock since the Future may never be returned in the first place.

Thoughts?

Oleg


RE: KafkaProducer block on send

Posted by Paolo Patierno <pp...@live.com>.
Sorry ... the callback is called with exception so I can check inside it ... btw send() shouldn't be blocking.

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor 
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience

> From: ppatierno@live.com
> To: users@kafka.apache.org
> Subject: RE: KafkaProducer block on send
> Date: Wed, 4 May 2016 07:24:25 +0000
> 
> Hi Oleg,
> 
> can you share the JIRA link here because I totally agree with you.
> For me the send() should be totally asynchronous and not blocking for the max.block.ms timeout.
> 
> Currently I'm using the overload with callback that, of course, isn't called if the send() fails due to timeout.
> In order to catch this scenario I need to do the following :
> 
> Future<RecordMetadata> future = this.producer.send(....);
> 
> if (future.isDone()) {
>                 try {
>                     future.get();
>                 } catch (InterruptedException e) {
>                     // TODO Auto-generated catch block
>                     e.printStackTrace();
>                 } catch (ExecutionException e) {
>                     // TODO Auto-generated catch block
>                     e.printStackTrace();
>                 }
>             }
> 
> I don't like it so much ...
> 
> Thanks,
> Paolo.
> 
> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor 
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperience
> 
> > Subject: Re: KafkaProducer block on send
> > From: ozhurakousky@hortonworks.com
> > To: users@kafka.apache.org
> > Date: Mon, 11 Apr 2016 19:42:17 +0000
> > 
> > Dana
> > 
> > Thanks for the explanation, but it sounds more like a workaround since everything you describe could be encapsulated within the Future itself. After all it "represents the result of an asynchronous computation"
> > 
> > executor.submit(new Callable<RecordMetadata>() {
> >      @Override
> >      public RecordMetadata call() throws Exception {
> >      // first make sure the metadata for the topic is available
> >      long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
> >      . . .
> >    }
> > });
> > 
> > 
> > The above would eliminate the confusion and keep user in control where even a legitimate blockage could be interrupted/canceled etc., based on various business/infrastructure requirements.
> > Anyway, I’ll raise the issue in JIRA and reference this thread
> > 
> > Cheers
> > Oleg
> > 
> > On Apr 8, 2016, at 10:31 AM, Dana Powers <da...@gmail.com>> wrote:
> > 
> > The prior discussion explained:
> > 
> > (1) The code you point to blocks for a maximum of max.block.ms, which is
> > user configurable. It does not block indefinitely with no user control as
> > you suggest. You are free to configure this to 0 if you like at it will not
> > block at all. Have you tried this like I suggested before?
> > 
> > (2) Even if you convinced people to remove waitOnMetadata, the send method
> > *still* blocks on memory back pressure (also configured by max.block.ms).
> > This is for good reason:
> > 
> > while True:
> >  producer.send(msg)
> > 
> > Can quickly devour all of you local memory and crash your process if the
> > outflow rate decreases, say if brokers go down or network partition occurs.
> > 
> > -Dana
> > I totally agree with Oleg.
> > 
> > As documentation says the producers send data in an asynchronous way and it
> > is enforced by the send method signature with a Future returned.
> > It can't block indefinitely without returning to the caller.
> > I'm mean, you can decide that the code inside the send method blocks
> > indefinitely but in an "asynchronous way", it should first return a Future
> > to the caller that can handle it.
> > 
> > Paolo.
> > 
> > Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> > Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> > Twitter : @ppatierno
> > Linkedin : paolopatierno
> > Blog : DevExperience
> > 
> > Subject: KafkaProducer block on send
> > From: ozhurakousky@hortonworks.com<ma...@hortonworks.com>
> > To: users@kafka.apache.org<ma...@kafka.apache.org>
> > Date: Thu, 7 Apr 2016 13:04:49 +0000
> > 
> > I know it’s been discussed before, but that conversation never really
> > concluded with any reasonable explanation, so I am bringing it up again as
> > I believe this is a bug that would need to be fixed in some future release.
> > Can someone please explain the rational for the following code in
> > KafkaProducer:
> > 
> > @Override
> > public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
> > callback) {
> >        try {
> >            // first make sure the metadata for the topic is available
> >            long waitedOnMetadataMs = waitOnMetadata(record.topic(),
> > this.maxBlockTimeMs);
> > . . .
> > }
> > 
> > By definition the method that returns Future implies that caller decides
> > how long to wait for the completion via Future.get(TIMETOWAIT). In this
> > case there is an explicit blocking call (waitOnMetadata), that can hang
> > infinitely (regardless of the reasons) which essentially results in user’s
> > code deadlock since the Future may never be returned in the first place.
> > 
> > Thoughts?
> > 
> > Oleg
> > 
> > 
>  		 	   		  
 		 	   		  

Re: KafkaProducer block on send

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
When I get a chance to reproduce the scenario I’ll follow up, but regardless the real question when advertising send as an "async send" how can it possibly block? It’s not async then. What’s the rational behind that?
You are returning the Future (which is good) essentially delegating back to the caller to determine how long it is willing to wait for the result of the invocation. 

Oleg

> On May 4, 2016, at 1:08 PM, Mayuresh Gharat <gh...@gmail.com> wrote:
> 
> I am not sure why max.block.ms does not suffice here?
> Also the waitOnMetadata will block only for the first time, later on it
> will have the metadata. I am not abler to understand the motivation here.
> Can you explain with an example?
> 
> Thanks,
> 
> Mayuresh
> 
> On Wed, May 4, 2016 at 9:55 AM, Dana Powers <da...@gmail.com> wrote:
> 
>> I think changes of this sort (design changes as opposed to bugs) typically
>> go through a KIP process before work is assigned. You might consider
>> starting a KIP discussion and see if there is interest in pursuing your
>> proposed changes.
>> 
>> -Dana
>> On May 4, 2016 7:58 AM, "Oleg Zhurakousky" <oz...@hortonworks.com>
>> wrote:
>> 
>>> Indeed it is.
>>> 
>>> Oleg
>>>> On May 4, 2016, at 10:54 AM, Paolo Patierno <pp...@live.com>
>> wrote:
>>>> 
>>>> It's sad that after almost one month it's still "unassigned" :-(
>>>> 
>>>> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
>>>> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
>>>> Twitter : @ppatierno
>>>> Linkedin : paolopatierno
>>>> Blog : DevExperience
>>>> 
>>>>> Subject: Re: KafkaProducer block on send
>>>>> From: ozhurakousky@hortonworks.com
>>>>> To: users@kafka.apache.org
>>>>> Date: Wed, 4 May 2016 14:47:25 +0000
>>>>> 
>>>>> Sure
>>>>> 
>>>>> Here are both:
>>>>> https://issues.apache.org/jira/browse/KAFKA-3539
>>>>> https://issues.apache.org/jira/browse/KAFKA-3540
>>>>> 
>>>>> On May 4, 2016, at 3:24 AM, Paolo Patierno <ppatierno@live.com
>> <mailto:
>>> ppatierno@live.com>> wrote:
>>>>> 
>>>>> Hi Oleg,
>>>>> 
>>>>> can you share the JIRA link here because I totally agree with you.
>>>>> For me the send() should be totally asynchronous and not blocking for
>>> the max.block.ms timeout.
>>>>> 
>>>>> Currently I'm using the overload with callback that, of course, isn't
>>> called if the send() fails due to timeout.
>>>>> In order to catch this scenario I need to do the following :
>>>>> 
>>>>> Future<RecordMetadata> future = this.producer.send(....);
>>>>> 
>>>>> if (future.isDone()) {
>>>>>              try {
>>>>>                  future.get();
>>>>>              } catch (InterruptedException e) {
>>>>>                  // TODO Auto-generated catch block
>>>>>                  e.printStackTrace();
>>>>>              } catch (ExecutionException e) {
>>>>>                  // TODO Auto-generated catch block
>>>>>                  e.printStackTrace();
>>>>>              }
>>>>>          }
>>>>> 
>>>>> I don't like it so much ...
>>>>> 
>>>>> Thanks,
>>>>> Paolo.
>>>>> 
>>>>> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
>>>>> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
>>>>> Twitter : @ppatierno
>>>>> Linkedin : paolopatierno
>>>>> Blog : DevExperience
>>>>> 
>>>>> Subject: Re: KafkaProducer block on send
>>>>> From: ozhurakousky@hortonworks.com<mailto:
>> ozhurakousky@hortonworks.com>
>>>>> To: users@kafka.apache.org<ma...@kafka.apache.org>
>>>>> Date: Mon, 11 Apr 2016 19:42:17 +0000
>>>>> 
>>>>> Dana
>>>>> 
>>>>> Thanks for the explanation, but it sounds more like a workaround since
>>> everything you describe could be encapsulated within the Future itself.
>>> After all it "represents the result of an asynchronous computation"
>>>>> 
>>>>> executor.submit(new Callable<RecordMetadata>() {
>>>>>   @Override
>>>>>   public RecordMetadata call() throws Exception {
>>>>>   // first make sure the metadata for the topic is available
>>>>>   long waitedOnMetadataMs = waitOnMetadata(record.topic(),
>>> this.maxBlockTimeMs);
>>>>>   . . .
>>>>> }
>>>>> });
>>>>> 
>>>>> 
>>>>> The above would eliminate the confusion and keep user in control where
>>> even a legitimate blockage could be interrupted/canceled etc., based on
>>> various business/infrastructure requirements.
>>>>> Anyway, I’ll raise the issue in JIRA and reference this thread
>>>>> 
>>>>> Cheers
>>>>> Oleg
>>>>> 
>>>>> On Apr 8, 2016, at 10:31 AM, Dana Powers <dana.powers@gmail.com
>> <mailto:
>>> dana.powers@gmail.com><ma...@gmail.com>> wrote:
>>>>> 
>>>>> The prior discussion explained:
>>>>> 
>>>>> (1) The code you point to blocks for a maximum of max.block.ms, which
>>> is
>>>>> user configurable. It does not block indefinitely with no user control
>>> as
>>>>> you suggest. You are free to configure this to 0 if you like at it
>> will
>>> not
>>>>> block at all. Have you tried this like I suggested before?
>>>>> 
>>>>> (2) Even if you convinced people to remove waitOnMetadata, the send
>>> method
>>>>> *still* blocks on memory back pressure (also configured by
>> max.block.ms
>>> ).
>>>>> This is for good reason:
>>>>> 
>>>>> while True:
>>>>> producer.send(msg)
>>>>> 
>>>>> Can quickly devour all of you local memory and crash your process if
>> the
>>>>> outflow rate decreases, say if brokers go down or network partition
>>> occurs.
>>>>> 
>>>>> -Dana
>>>>> I totally agree with Oleg.
>>>>> 
>>>>> As documentation says the producers send data in an asynchronous way
>>> and it
>>>>> is enforced by the send method signature with a Future returned.
>>>>> It can't block indefinitely without returning to the caller.
>>>>> I'm mean, you can decide that the code inside the send method blocks
>>>>> indefinitely but in an "asynchronous way", it should first return a
>>> Future
>>>>> to the caller that can handle it.
>>>>> 
>>>>> Paolo.
>>>>> 
>>>>> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
>>>>> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
>>>>> Twitter : @ppatierno
>>>>> Linkedin : paolopatierno
>>>>> Blog : DevExperience
>>>>> 
>>>>> Subject: KafkaProducer block on send
>>>>> From: ozhurakousky@hortonworks.com<mailto:
>> ozhurakousky@hortonworks.com
>>>> <ma...@hortonworks.com>
>>>>> To: users@kafka.apache.org<ma...@kafka.apache.org><mailto:
>>> users@kafka.apache.org>
>>>>> Date: Thu, 7 Apr 2016 13:04:49 +0000
>>>>> 
>>>>> I know it’s been discussed before, but that conversation never really
>>>>> concluded with any reasonable explanation, so I am bringing it up
>> again
>>> as
>>>>> I believe this is a bug that would need to be fixed in some future
>>> release.
>>>>> Can someone please explain the rational for the following code in
>>>>> KafkaProducer:
>>>>> 
>>>>> @Override
>>>>> public Future<RecordMetadata> send(ProducerRecord<K, V> record,
>> Callback
>>>>> callback) {
>>>>>     try {
>>>>>         // first make sure the metadata for the topic is available
>>>>>         long waitedOnMetadataMs = waitOnMetadata(record.topic(),
>>>>> this.maxBlockTimeMs);
>>>>> . . .
>>>>> }
>>>>> 
>>>>> By definition the method that returns Future implies that caller
>> decides
>>>>> how long to wait for the completion via Future.get(TIMETOWAIT). In
>> this
>>>>> case there is an explicit blocking call (waitOnMetadata), that can
>> hang
>>>>> infinitely (regardless of the reasons) which essentially results in
>>> user’s
>>>>> code deadlock since the Future may never be returned in the first
>> place.
>>>>> 
>>>>> Thoughts?
>>>>> 
>>>>> Oleg
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 
> 
> 
> 
> -- 
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125


Re: KafkaProducer block on send

Posted by Mayuresh Gharat <gh...@gmail.com>.
I am not sure why max.block.ms does not suffice here?
Also the waitOnMetadata will block only for the first time, later on it
will have the metadata. I am not abler to understand the motivation here.
Can you explain with an example?

Thanks,

Mayuresh

On Wed, May 4, 2016 at 9:55 AM, Dana Powers <da...@gmail.com> wrote:

> I think changes of this sort (design changes as opposed to bugs) typically
> go through a KIP process before work is assigned. You might consider
> starting a KIP discussion and see if there is interest in pursuing your
> proposed changes.
>
> -Dana
> On May 4, 2016 7:58 AM, "Oleg Zhurakousky" <oz...@hortonworks.com>
> wrote:
>
> > Indeed it is.
> >
> > Oleg
> > > On May 4, 2016, at 10:54 AM, Paolo Patierno <pp...@live.com>
> wrote:
> > >
> > > It's sad that after almost one month it's still "unassigned" :-(
> > >
> > > Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> > > Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> > > Twitter : @ppatierno
> > > Linkedin : paolopatierno
> > > Blog : DevExperience
> > >
> > >> Subject: Re: KafkaProducer block on send
> > >> From: ozhurakousky@hortonworks.com
> > >> To: users@kafka.apache.org
> > >> Date: Wed, 4 May 2016 14:47:25 +0000
> > >>
> > >> Sure
> > >>
> > >> Here are both:
> > >> https://issues.apache.org/jira/browse/KAFKA-3539
> > >> https://issues.apache.org/jira/browse/KAFKA-3540
> > >>
> > >> On May 4, 2016, at 3:24 AM, Paolo Patierno <ppatierno@live.com
> <mailto:
> > ppatierno@live.com>> wrote:
> > >>
> > >> Hi Oleg,
> > >>
> > >> can you share the JIRA link here because I totally agree with you.
> > >> For me the send() should be totally asynchronous and not blocking for
> > the max.block.ms timeout.
> > >>
> > >> Currently I'm using the overload with callback that, of course, isn't
> > called if the send() fails due to timeout.
> > >> In order to catch this scenario I need to do the following :
> > >>
> > >> Future<RecordMetadata> future = this.producer.send(....);
> > >>
> > >> if (future.isDone()) {
> > >>               try {
> > >>                   future.get();
> > >>               } catch (InterruptedException e) {
> > >>                   // TODO Auto-generated catch block
> > >>                   e.printStackTrace();
> > >>               } catch (ExecutionException e) {
> > >>                   // TODO Auto-generated catch block
> > >>                   e.printStackTrace();
> > >>               }
> > >>           }
> > >>
> > >> I don't like it so much ...
> > >>
> > >> Thanks,
> > >> Paolo.
> > >>
> > >> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> > >> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> > >> Twitter : @ppatierno
> > >> Linkedin : paolopatierno
> > >> Blog : DevExperience
> > >>
> > >> Subject: Re: KafkaProducer block on send
> > >> From: ozhurakousky@hortonworks.com<mailto:
> ozhurakousky@hortonworks.com>
> > >> To: users@kafka.apache.org<ma...@kafka.apache.org>
> > >> Date: Mon, 11 Apr 2016 19:42:17 +0000
> > >>
> > >> Dana
> > >>
> > >> Thanks for the explanation, but it sounds more like a workaround since
> > everything you describe could be encapsulated within the Future itself.
> > After all it "represents the result of an asynchronous computation"
> > >>
> > >> executor.submit(new Callable<RecordMetadata>() {
> > >>    @Override
> > >>    public RecordMetadata call() throws Exception {
> > >>    // first make sure the metadata for the topic is available
> > >>    long waitedOnMetadataMs = waitOnMetadata(record.topic(),
> > this.maxBlockTimeMs);
> > >>    . . .
> > >>  }
> > >> });
> > >>
> > >>
> > >> The above would eliminate the confusion and keep user in control where
> > even a legitimate blockage could be interrupted/canceled etc., based on
> > various business/infrastructure requirements.
> > >> Anyway, I’ll raise the issue in JIRA and reference this thread
> > >>
> > >> Cheers
> > >> Oleg
> > >>
> > >> On Apr 8, 2016, at 10:31 AM, Dana Powers <dana.powers@gmail.com
> <mailto:
> > dana.powers@gmail.com><ma...@gmail.com>> wrote:
> > >>
> > >> The prior discussion explained:
> > >>
> > >> (1) The code you point to blocks for a maximum of max.block.ms, which
> > is
> > >> user configurable. It does not block indefinitely with no user control
> > as
> > >> you suggest. You are free to configure this to 0 if you like at it
> will
> > not
> > >> block at all. Have you tried this like I suggested before?
> > >>
> > >> (2) Even if you convinced people to remove waitOnMetadata, the send
> > method
> > >> *still* blocks on memory back pressure (also configured by
> max.block.ms
> > ).
> > >> This is for good reason:
> > >>
> > >> while True:
> > >> producer.send(msg)
> > >>
> > >> Can quickly devour all of you local memory and crash your process if
> the
> > >> outflow rate decreases, say if brokers go down or network partition
> > occurs.
> > >>
> > >> -Dana
> > >> I totally agree with Oleg.
> > >>
> > >> As documentation says the producers send data in an asynchronous way
> > and it
> > >> is enforced by the send method signature with a Future returned.
> > >> It can't block indefinitely without returning to the caller.
> > >> I'm mean, you can decide that the code inside the send method blocks
> > >> indefinitely but in an "asynchronous way", it should first return a
> > Future
> > >> to the caller that can handle it.
> > >>
> > >> Paolo.
> > >>
> > >> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> > >> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> > >> Twitter : @ppatierno
> > >> Linkedin : paolopatierno
> > >> Blog : DevExperience
> > >>
> > >> Subject: KafkaProducer block on send
> > >> From: ozhurakousky@hortonworks.com<mailto:
> ozhurakousky@hortonworks.com
> > ><ma...@hortonworks.com>
> > >> To: users@kafka.apache.org<ma...@kafka.apache.org><mailto:
> > users@kafka.apache.org>
> > >> Date: Thu, 7 Apr 2016 13:04:49 +0000
> > >>
> > >> I know it’s been discussed before, but that conversation never really
> > >> concluded with any reasonable explanation, so I am bringing it up
> again
> > as
> > >> I believe this is a bug that would need to be fixed in some future
> > release.
> > >> Can someone please explain the rational for the following code in
> > >> KafkaProducer:
> > >>
> > >> @Override
> > >> public Future<RecordMetadata> send(ProducerRecord<K, V> record,
> Callback
> > >> callback) {
> > >>      try {
> > >>          // first make sure the metadata for the topic is available
> > >>          long waitedOnMetadataMs = waitOnMetadata(record.topic(),
> > >> this.maxBlockTimeMs);
> > >> . . .
> > >> }
> > >>
> > >> By definition the method that returns Future implies that caller
> decides
> > >> how long to wait for the completion via Future.get(TIMETOWAIT). In
> this
> > >> case there is an explicit blocking call (waitOnMetadata), that can
> hang
> > >> infinitely (regardless of the reasons) which essentially results in
> > user’s
> > >> code deadlock since the Future may never be returned in the first
> place.
> > >>
> > >> Thoughts?
> > >>
> > >> Oleg
> > >>
> > >>
> > >>
> > >>
> > >
> >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: KafkaProducer block on send

Posted by Dana Powers <da...@gmail.com>.
I think changes of this sort (design changes as opposed to bugs) typically
go through a KIP process before work is assigned. You might consider
starting a KIP discussion and see if there is interest in pursuing your
proposed changes.

-Dana
On May 4, 2016 7:58 AM, "Oleg Zhurakousky" <oz...@hortonworks.com>
wrote:

> Indeed it is.
>
> Oleg
> > On May 4, 2016, at 10:54 AM, Paolo Patierno <pp...@live.com> wrote:
> >
> > It's sad that after almost one month it's still "unassigned" :-(
> >
> > Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> > Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> > Twitter : @ppatierno
> > Linkedin : paolopatierno
> > Blog : DevExperience
> >
> >> Subject: Re: KafkaProducer block on send
> >> From: ozhurakousky@hortonworks.com
> >> To: users@kafka.apache.org
> >> Date: Wed, 4 May 2016 14:47:25 +0000
> >>
> >> Sure
> >>
> >> Here are both:
> >> https://issues.apache.org/jira/browse/KAFKA-3539
> >> https://issues.apache.org/jira/browse/KAFKA-3540
> >>
> >> On May 4, 2016, at 3:24 AM, Paolo Patierno <ppatierno@live.com<mailto:
> ppatierno@live.com>> wrote:
> >>
> >> Hi Oleg,
> >>
> >> can you share the JIRA link here because I totally agree with you.
> >> For me the send() should be totally asynchronous and not blocking for
> the max.block.ms timeout.
> >>
> >> Currently I'm using the overload with callback that, of course, isn't
> called if the send() fails due to timeout.
> >> In order to catch this scenario I need to do the following :
> >>
> >> Future<RecordMetadata> future = this.producer.send(....);
> >>
> >> if (future.isDone()) {
> >>               try {
> >>                   future.get();
> >>               } catch (InterruptedException e) {
> >>                   // TODO Auto-generated catch block
> >>                   e.printStackTrace();
> >>               } catch (ExecutionException e) {
> >>                   // TODO Auto-generated catch block
> >>                   e.printStackTrace();
> >>               }
> >>           }
> >>
> >> I don't like it so much ...
> >>
> >> Thanks,
> >> Paolo.
> >>
> >> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> >> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> >> Twitter : @ppatierno
> >> Linkedin : paolopatierno
> >> Blog : DevExperience
> >>
> >> Subject: Re: KafkaProducer block on send
> >> From: ozhurakousky@hortonworks.com<ma...@hortonworks.com>
> >> To: users@kafka.apache.org<ma...@kafka.apache.org>
> >> Date: Mon, 11 Apr 2016 19:42:17 +0000
> >>
> >> Dana
> >>
> >> Thanks for the explanation, but it sounds more like a workaround since
> everything you describe could be encapsulated within the Future itself.
> After all it "represents the result of an asynchronous computation"
> >>
> >> executor.submit(new Callable<RecordMetadata>() {
> >>    @Override
> >>    public RecordMetadata call() throws Exception {
> >>    // first make sure the metadata for the topic is available
> >>    long waitedOnMetadataMs = waitOnMetadata(record.topic(),
> this.maxBlockTimeMs);
> >>    . . .
> >>  }
> >> });
> >>
> >>
> >> The above would eliminate the confusion and keep user in control where
> even a legitimate blockage could be interrupted/canceled etc., based on
> various business/infrastructure requirements.
> >> Anyway, I’ll raise the issue in JIRA and reference this thread
> >>
> >> Cheers
> >> Oleg
> >>
> >> On Apr 8, 2016, at 10:31 AM, Dana Powers <dana.powers@gmail.com<mailto:
> dana.powers@gmail.com><ma...@gmail.com>> wrote:
> >>
> >> The prior discussion explained:
> >>
> >> (1) The code you point to blocks for a maximum of max.block.ms, which
> is
> >> user configurable. It does not block indefinitely with no user control
> as
> >> you suggest. You are free to configure this to 0 if you like at it will
> not
> >> block at all. Have you tried this like I suggested before?
> >>
> >> (2) Even if you convinced people to remove waitOnMetadata, the send
> method
> >> *still* blocks on memory back pressure (also configured by max.block.ms
> ).
> >> This is for good reason:
> >>
> >> while True:
> >> producer.send(msg)
> >>
> >> Can quickly devour all of you local memory and crash your process if the
> >> outflow rate decreases, say if brokers go down or network partition
> occurs.
> >>
> >> -Dana
> >> I totally agree with Oleg.
> >>
> >> As documentation says the producers send data in an asynchronous way
> and it
> >> is enforced by the send method signature with a Future returned.
> >> It can't block indefinitely without returning to the caller.
> >> I'm mean, you can decide that the code inside the send method blocks
> >> indefinitely but in an "asynchronous way", it should first return a
> Future
> >> to the caller that can handle it.
> >>
> >> Paolo.
> >>
> >> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> >> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> >> Twitter : @ppatierno
> >> Linkedin : paolopatierno
> >> Blog : DevExperience
> >>
> >> Subject: KafkaProducer block on send
> >> From: ozhurakousky@hortonworks.com<mailto:ozhurakousky@hortonworks.com
> ><ma...@hortonworks.com>
> >> To: users@kafka.apache.org<ma...@kafka.apache.org><mailto:
> users@kafka.apache.org>
> >> Date: Thu, 7 Apr 2016 13:04:49 +0000
> >>
> >> I know it’s been discussed before, but that conversation never really
> >> concluded with any reasonable explanation, so I am bringing it up again
> as
> >> I believe this is a bug that would need to be fixed in some future
> release.
> >> Can someone please explain the rational for the following code in
> >> KafkaProducer:
> >>
> >> @Override
> >> public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
> >> callback) {
> >>      try {
> >>          // first make sure the metadata for the topic is available
> >>          long waitedOnMetadataMs = waitOnMetadata(record.topic(),
> >> this.maxBlockTimeMs);
> >> . . .
> >> }
> >>
> >> By definition the method that returns Future implies that caller decides
> >> how long to wait for the completion via Future.get(TIMETOWAIT). In this
> >> case there is an explicit blocking call (waitOnMetadata), that can hang
> >> infinitely (regardless of the reasons) which essentially results in
> user’s
> >> code deadlock since the Future may never be returned in the first place.
> >>
> >> Thoughts?
> >>
> >> Oleg
> >>
> >>
> >>
> >>
> >
>
>

Re: KafkaProducer block on send

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Indeed it is.

Oleg
> On May 4, 2016, at 10:54 AM, Paolo Patierno <pp...@live.com> wrote:
> 
> It's sad that after almost one month it's still "unassigned" :-(
> 
> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor 
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperience
> 
>> Subject: Re: KafkaProducer block on send
>> From: ozhurakousky@hortonworks.com
>> To: users@kafka.apache.org
>> Date: Wed, 4 May 2016 14:47:25 +0000
>> 
>> Sure
>> 
>> Here are both:
>> https://issues.apache.org/jira/browse/KAFKA-3539
>> https://issues.apache.org/jira/browse/KAFKA-3540
>> 
>> On May 4, 2016, at 3:24 AM, Paolo Patierno <pp...@live.com>> wrote:
>> 
>> Hi Oleg,
>> 
>> can you share the JIRA link here because I totally agree with you.
>> For me the send() should be totally asynchronous and not blocking for the max.block.ms timeout.
>> 
>> Currently I'm using the overload with callback that, of course, isn't called if the send() fails due to timeout.
>> In order to catch this scenario I need to do the following :
>> 
>> Future<RecordMetadata> future = this.producer.send(....);
>> 
>> if (future.isDone()) {
>>               try {
>>                   future.get();
>>               } catch (InterruptedException e) {
>>                   // TODO Auto-generated catch block
>>                   e.printStackTrace();
>>               } catch (ExecutionException e) {
>>                   // TODO Auto-generated catch block
>>                   e.printStackTrace();
>>               }
>>           }
>> 
>> I don't like it so much ...
>> 
>> Thanks,
>> Paolo.
>> 
>> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
>> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
>> Twitter : @ppatierno
>> Linkedin : paolopatierno
>> Blog : DevExperience
>> 
>> Subject: Re: KafkaProducer block on send
>> From: ozhurakousky@hortonworks.com<ma...@hortonworks.com>
>> To: users@kafka.apache.org<ma...@kafka.apache.org>
>> Date: Mon, 11 Apr 2016 19:42:17 +0000
>> 
>> Dana
>> 
>> Thanks for the explanation, but it sounds more like a workaround since everything you describe could be encapsulated within the Future itself. After all it "represents the result of an asynchronous computation"
>> 
>> executor.submit(new Callable<RecordMetadata>() {
>>    @Override
>>    public RecordMetadata call() throws Exception {
>>    // first make sure the metadata for the topic is available
>>    long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
>>    . . .
>>  }
>> });
>> 
>> 
>> The above would eliminate the confusion and keep user in control where even a legitimate blockage could be interrupted/canceled etc., based on various business/infrastructure requirements.
>> Anyway, I’ll raise the issue in JIRA and reference this thread
>> 
>> Cheers
>> Oleg
>> 
>> On Apr 8, 2016, at 10:31 AM, Dana Powers <da...@gmail.com>> wrote:
>> 
>> The prior discussion explained:
>> 
>> (1) The code you point to blocks for a maximum of max.block.ms, which is
>> user configurable. It does not block indefinitely with no user control as
>> you suggest. You are free to configure this to 0 if you like at it will not
>> block at all. Have you tried this like I suggested before?
>> 
>> (2) Even if you convinced people to remove waitOnMetadata, the send method
>> *still* blocks on memory back pressure (also configured by max.block.ms).
>> This is for good reason:
>> 
>> while True:
>> producer.send(msg)
>> 
>> Can quickly devour all of you local memory and crash your process if the
>> outflow rate decreases, say if brokers go down or network partition occurs.
>> 
>> -Dana
>> I totally agree with Oleg.
>> 
>> As documentation says the producers send data in an asynchronous way and it
>> is enforced by the send method signature with a Future returned.
>> It can't block indefinitely without returning to the caller.
>> I'm mean, you can decide that the code inside the send method blocks
>> indefinitely but in an "asynchronous way", it should first return a Future
>> to the caller that can handle it.
>> 
>> Paolo.
>> 
>> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
>> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
>> Twitter : @ppatierno
>> Linkedin : paolopatierno
>> Blog : DevExperience
>> 
>> Subject: KafkaProducer block on send
>> From: ozhurakousky@hortonworks.com<ma...@hortonworks.com>
>> To: users@kafka.apache.org<ma...@kafka.apache.org>
>> Date: Thu, 7 Apr 2016 13:04:49 +0000
>> 
>> I know it’s been discussed before, but that conversation never really
>> concluded with any reasonable explanation, so I am bringing it up again as
>> I believe this is a bug that would need to be fixed in some future release.
>> Can someone please explain the rational for the following code in
>> KafkaProducer:
>> 
>> @Override
>> public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
>> callback) {
>>      try {
>>          // first make sure the metadata for the topic is available
>>          long waitedOnMetadataMs = waitOnMetadata(record.topic(),
>> this.maxBlockTimeMs);
>> . . .
>> }
>> 
>> By definition the method that returns Future implies that caller decides
>> how long to wait for the completion via Future.get(TIMETOWAIT). In this
>> case there is an explicit blocking call (waitOnMetadata), that can hang
>> infinitely (regardless of the reasons) which essentially results in user’s
>> code deadlock since the Future may never be returned in the first place.
>> 
>> Thoughts?
>> 
>> Oleg
>> 
>> 
>> 
>> 
> 		 	   		  


RE: KafkaProducer block on send

Posted by Paolo Patierno <pp...@live.com>.
It's sad that after almost one month it's still "unassigned" :-(

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor 
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience

> Subject: Re: KafkaProducer block on send
> From: ozhurakousky@hortonworks.com
> To: users@kafka.apache.org
> Date: Wed, 4 May 2016 14:47:25 +0000
> 
> Sure
> 
> Here are both:
> https://issues.apache.org/jira/browse/KAFKA-3539
> https://issues.apache.org/jira/browse/KAFKA-3540
> 
> On May 4, 2016, at 3:24 AM, Paolo Patierno <pp...@live.com>> wrote:
> 
> Hi Oleg,
> 
> can you share the JIRA link here because I totally agree with you.
> For me the send() should be totally asynchronous and not blocking for the max.block.ms timeout.
> 
> Currently I'm using the overload with callback that, of course, isn't called if the send() fails due to timeout.
> In order to catch this scenario I need to do the following :
> 
> Future<RecordMetadata> future = this.producer.send(....);
> 
> if (future.isDone()) {
>                try {
>                    future.get();
>                } catch (InterruptedException e) {
>                    // TODO Auto-generated catch block
>                    e.printStackTrace();
>                } catch (ExecutionException e) {
>                    // TODO Auto-generated catch block
>                    e.printStackTrace();
>                }
>            }
> 
> I don't like it so much ...
> 
> Thanks,
> Paolo.
> 
> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperience
> 
> Subject: Re: KafkaProducer block on send
> From: ozhurakousky@hortonworks.com<ma...@hortonworks.com>
> To: users@kafka.apache.org<ma...@kafka.apache.org>
> Date: Mon, 11 Apr 2016 19:42:17 +0000
> 
> Dana
> 
> Thanks for the explanation, but it sounds more like a workaround since everything you describe could be encapsulated within the Future itself. After all it "represents the result of an asynchronous computation"
> 
> executor.submit(new Callable<RecordMetadata>() {
>     @Override
>     public RecordMetadata call() throws Exception {
>     // first make sure the metadata for the topic is available
>     long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
>     . . .
>   }
> });
> 
> 
> The above would eliminate the confusion and keep user in control where even a legitimate blockage could be interrupted/canceled etc., based on various business/infrastructure requirements.
> Anyway, I’ll raise the issue in JIRA and reference this thread
> 
> Cheers
> Oleg
> 
> On Apr 8, 2016, at 10:31 AM, Dana Powers <da...@gmail.com>> wrote:
> 
> The prior discussion explained:
> 
> (1) The code you point to blocks for a maximum of max.block.ms, which is
> user configurable. It does not block indefinitely with no user control as
> you suggest. You are free to configure this to 0 if you like at it will not
> block at all. Have you tried this like I suggested before?
> 
> (2) Even if you convinced people to remove waitOnMetadata, the send method
> *still* blocks on memory back pressure (also configured by max.block.ms).
> This is for good reason:
> 
> while True:
> producer.send(msg)
> 
> Can quickly devour all of you local memory and crash your process if the
> outflow rate decreases, say if brokers go down or network partition occurs.
> 
> -Dana
> I totally agree with Oleg.
> 
> As documentation says the producers send data in an asynchronous way and it
> is enforced by the send method signature with a Future returned.
> It can't block indefinitely without returning to the caller.
> I'm mean, you can decide that the code inside the send method blocks
> indefinitely but in an "asynchronous way", it should first return a Future
> to the caller that can handle it.
> 
> Paolo.
> 
> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperience
> 
> Subject: KafkaProducer block on send
> From: ozhurakousky@hortonworks.com<ma...@hortonworks.com>
> To: users@kafka.apache.org<ma...@kafka.apache.org>
> Date: Thu, 7 Apr 2016 13:04:49 +0000
> 
> I know it’s been discussed before, but that conversation never really
> concluded with any reasonable explanation, so I am bringing it up again as
> I believe this is a bug that would need to be fixed in some future release.
> Can someone please explain the rational for the following code in
> KafkaProducer:
> 
> @Override
> public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
> callback) {
>       try {
>           // first make sure the metadata for the topic is available
>           long waitedOnMetadataMs = waitOnMetadata(record.topic(),
> this.maxBlockTimeMs);
> . . .
> }
> 
> By definition the method that returns Future implies that caller decides
> how long to wait for the completion via Future.get(TIMETOWAIT). In this
> case there is an explicit blocking call (waitOnMetadata), that can hang
> infinitely (regardless of the reasons) which essentially results in user’s
> code deadlock since the Future may never be returned in the first place.
> 
> Thoughts?
> 
> Oleg
> 
> 
> 
> 
 		 	   		  

Re: KafkaProducer block on send

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Sure

Here are both:
https://issues.apache.org/jira/browse/KAFKA-3539
https://issues.apache.org/jira/browse/KAFKA-3540

On May 4, 2016, at 3:24 AM, Paolo Patierno <pp...@live.com>> wrote:

Hi Oleg,

can you share the JIRA link here because I totally agree with you.
For me the send() should be totally asynchronous and not blocking for the max.block.ms timeout.

Currently I'm using the overload with callback that, of course, isn't called if the send() fails due to timeout.
In order to catch this scenario I need to do the following :

Future<RecordMetadata> future = this.producer.send(....);

if (future.isDone()) {
               try {
                   future.get();
               } catch (InterruptedException e) {
                   // TODO Auto-generated catch block
                   e.printStackTrace();
               } catch (ExecutionException e) {
                   // TODO Auto-generated catch block
                   e.printStackTrace();
               }
           }

I don't like it so much ...

Thanks,
Paolo.

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience

Subject: Re: KafkaProducer block on send
From: ozhurakousky@hortonworks.com<ma...@hortonworks.com>
To: users@kafka.apache.org<ma...@kafka.apache.org>
Date: Mon, 11 Apr 2016 19:42:17 +0000

Dana

Thanks for the explanation, but it sounds more like a workaround since everything you describe could be encapsulated within the Future itself. After all it "represents the result of an asynchronous computation"

executor.submit(new Callable<RecordMetadata>() {
    @Override
    public RecordMetadata call() throws Exception {
    // first make sure the metadata for the topic is available
    long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
    . . .
  }
});


The above would eliminate the confusion and keep user in control where even a legitimate blockage could be interrupted/canceled etc., based on various business/infrastructure requirements.
Anyway, I’ll raise the issue in JIRA and reference this thread

Cheers
Oleg

On Apr 8, 2016, at 10:31 AM, Dana Powers <da...@gmail.com>> wrote:

The prior discussion explained:

(1) The code you point to blocks for a maximum of max.block.ms, which is
user configurable. It does not block indefinitely with no user control as
you suggest. You are free to configure this to 0 if you like at it will not
block at all. Have you tried this like I suggested before?

(2) Even if you convinced people to remove waitOnMetadata, the send method
*still* blocks on memory back pressure (also configured by max.block.ms).
This is for good reason:

while True:
producer.send(msg)

Can quickly devour all of you local memory and crash your process if the
outflow rate decreases, say if brokers go down or network partition occurs.

-Dana
I totally agree with Oleg.

As documentation says the producers send data in an asynchronous way and it
is enforced by the send method signature with a Future returned.
It can't block indefinitely without returning to the caller.
I'm mean, you can decide that the code inside the send method blocks
indefinitely but in an "asynchronous way", it should first return a Future
to the caller that can handle it.

Paolo.

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience

Subject: KafkaProducer block on send
From: ozhurakousky@hortonworks.com<ma...@hortonworks.com>
To: users@kafka.apache.org<ma...@kafka.apache.org>
Date: Thu, 7 Apr 2016 13:04:49 +0000

I know it’s been discussed before, but that conversation never really
concluded with any reasonable explanation, so I am bringing it up again as
I believe this is a bug that would need to be fixed in some future release.
Can someone please explain the rational for the following code in
KafkaProducer:

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
      try {
          // first make sure the metadata for the topic is available
          long waitedOnMetadataMs = waitOnMetadata(record.topic(),
this.maxBlockTimeMs);
. . .
}

By definition the method that returns Future implies that caller decides
how long to wait for the completion via Future.get(TIMETOWAIT). In this
case there is an explicit blocking call (waitOnMetadata), that can hang
infinitely (regardless of the reasons) which essentially results in user’s
code deadlock since the Future may never be returned in the first place.

Thoughts?

Oleg





RE: KafkaProducer block on send

Posted by Paolo Patierno <pp...@live.com>.
Hi Oleg,

can you share the JIRA link here because I totally agree with you.
For me the send() should be totally asynchronous and not blocking for the max.block.ms timeout.

Currently I'm using the overload with callback that, of course, isn't called if the send() fails due to timeout.
In order to catch this scenario I need to do the following :

Future<RecordMetadata> future = this.producer.send(....);

if (future.isDone()) {
                try {
                    future.get();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

I don't like it so much ...

Thanks,
Paolo.

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor 
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience

> Subject: Re: KafkaProducer block on send
> From: ozhurakousky@hortonworks.com
> To: users@kafka.apache.org
> Date: Mon, 11 Apr 2016 19:42:17 +0000
> 
> Dana
> 
> Thanks for the explanation, but it sounds more like a workaround since everything you describe could be encapsulated within the Future itself. After all it "represents the result of an asynchronous computation"
> 
> executor.submit(new Callable<RecordMetadata>() {
>      @Override
>      public RecordMetadata call() throws Exception {
>      // first make sure the metadata for the topic is available
>      long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
>      . . .
>    }
> });
> 
> 
> The above would eliminate the confusion and keep user in control where even a legitimate blockage could be interrupted/canceled etc., based on various business/infrastructure requirements.
> Anyway, I’ll raise the issue in JIRA and reference this thread
> 
> Cheers
> Oleg
> 
> On Apr 8, 2016, at 10:31 AM, Dana Powers <da...@gmail.com>> wrote:
> 
> The prior discussion explained:
> 
> (1) The code you point to blocks for a maximum of max.block.ms, which is
> user configurable. It does not block indefinitely with no user control as
> you suggest. You are free to configure this to 0 if you like at it will not
> block at all. Have you tried this like I suggested before?
> 
> (2) Even if you convinced people to remove waitOnMetadata, the send method
> *still* blocks on memory back pressure (also configured by max.block.ms).
> This is for good reason:
> 
> while True:
>  producer.send(msg)
> 
> Can quickly devour all of you local memory and crash your process if the
> outflow rate decreases, say if brokers go down or network partition occurs.
> 
> -Dana
> I totally agree with Oleg.
> 
> As documentation says the producers send data in an asynchronous way and it
> is enforced by the send method signature with a Future returned.
> It can't block indefinitely without returning to the caller.
> I'm mean, you can decide that the code inside the send method blocks
> indefinitely but in an "asynchronous way", it should first return a Future
> to the caller that can handle it.
> 
> Paolo.
> 
> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperience
> 
> Subject: KafkaProducer block on send
> From: ozhurakousky@hortonworks.com<ma...@hortonworks.com>
> To: users@kafka.apache.org<ma...@kafka.apache.org>
> Date: Thu, 7 Apr 2016 13:04:49 +0000
> 
> I know it’s been discussed before, but that conversation never really
> concluded with any reasonable explanation, so I am bringing it up again as
> I believe this is a bug that would need to be fixed in some future release.
> Can someone please explain the rational for the following code in
> KafkaProducer:
> 
> @Override
> public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
> callback) {
>        try {
>            // first make sure the metadata for the topic is available
>            long waitedOnMetadataMs = waitOnMetadata(record.topic(),
> this.maxBlockTimeMs);
> . . .
> }
> 
> By definition the method that returns Future implies that caller decides
> how long to wait for the completion via Future.get(TIMETOWAIT). In this
> case there is an explicit blocking call (waitOnMetadata), that can hang
> infinitely (regardless of the reasons) which essentially results in user’s
> code deadlock since the Future may never be returned in the first place.
> 
> Thoughts?
> 
> Oleg
> 
> 
 		 	   		  

Re: KafkaProducer block on send

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Dana

Thanks for the explanation, but it sounds more like a workaround since everything you describe could be encapsulated within the Future itself. After all it "represents the result of an asynchronous computation"

executor.submit(new Callable<RecordMetadata>() {
     @Override
     public RecordMetadata call() throws Exception {
     // first make sure the metadata for the topic is available
     long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
     . . .
   }
});


The above would eliminate the confusion and keep user in control where even a legitimate blockage could be interrupted/canceled etc., based on various business/infrastructure requirements.
Anyway, I’ll raise the issue in JIRA and reference this thread

Cheers
Oleg

On Apr 8, 2016, at 10:31 AM, Dana Powers <da...@gmail.com>> wrote:

The prior discussion explained:

(1) The code you point to blocks for a maximum of max.block.ms, which is
user configurable. It does not block indefinitely with no user control as
you suggest. You are free to configure this to 0 if you like at it will not
block at all. Have you tried this like I suggested before?

(2) Even if you convinced people to remove waitOnMetadata, the send method
*still* blocks on memory back pressure (also configured by max.block.ms).
This is for good reason:

while True:
 producer.send(msg)

Can quickly devour all of you local memory and crash your process if the
outflow rate decreases, say if brokers go down or network partition occurs.

-Dana
I totally agree with Oleg.

As documentation says the producers send data in an asynchronous way and it
is enforced by the send method signature with a Future returned.
It can't block indefinitely without returning to the caller.
I'm mean, you can decide that the code inside the send method blocks
indefinitely but in an "asynchronous way", it should first return a Future
to the caller that can handle it.

Paolo.

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience

Subject: KafkaProducer block on send
From: ozhurakousky@hortonworks.com<ma...@hortonworks.com>
To: users@kafka.apache.org<ma...@kafka.apache.org>
Date: Thu, 7 Apr 2016 13:04:49 +0000

I know it’s been discussed before, but that conversation never really
concluded with any reasonable explanation, so I am bringing it up again as
I believe this is a bug that would need to be fixed in some future release.
Can someone please explain the rational for the following code in
KafkaProducer:

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
       try {
           // first make sure the metadata for the topic is available
           long waitedOnMetadataMs = waitOnMetadata(record.topic(),
this.maxBlockTimeMs);
. . .
}

By definition the method that returns Future implies that caller decides
how long to wait for the completion via Future.get(TIMETOWAIT). In this
case there is an explicit blocking call (waitOnMetadata), that can hang
infinitely (regardless of the reasons) which essentially results in user’s
code deadlock since the Future may never be returned in the first place.

Thoughts?

Oleg



RE: KafkaProducer block on send

Posted by Dana Powers <da...@gmail.com>.
The prior discussion explained:

(1) The code you point to blocks for a maximum of max.block.ms, which is
user configurable. It does not block indefinitely with no user control as
you suggest. You are free to configure this to 0 if you like at it will not
block at all. Have you tried this like I suggested before?

(2) Even if you convinced people to remove waitOnMetadata, the send method
*still* blocks on memory back pressure (also configured by max.block.ms).
This is for good reason:

while True:
  producer.send(msg)

Can quickly devour all of you local memory and crash your process if the
outflow rate decreases, say if brokers go down or network partition occurs.

-Dana
I totally agree with Oleg.

As documentation says the producers send data in an asynchronous way and it
is enforced by the send method signature with a Future returned.
It can't block indefinitely without returning to the caller.
I'm mean, you can decide that the code inside the send method blocks
indefinitely but in an "asynchronous way", it should first return a Future
to the caller that can handle it.

Paolo.

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience

> Subject: KafkaProducer block on send
> From: ozhurakousky@hortonworks.com
> To: users@kafka.apache.org
> Date: Thu, 7 Apr 2016 13:04:49 +0000
>
> I know it’s been discussed before, but that conversation never really
concluded with any reasonable explanation, so I am bringing it up again as
I believe this is a bug that would need to be fixed in some future release.
> Can someone please explain the rational for the following code in
KafkaProducer:
>
> @Override
> public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
>         try {
>             // first make sure the metadata for the topic is available
>             long waitedOnMetadataMs = waitOnMetadata(record.topic(),
this.maxBlockTimeMs);
> . . .
> }
>
> By definition the method that returns Future implies that caller decides
how long to wait for the completion via Future.get(TIMETOWAIT). In this
case there is an explicit blocking call (waitOnMetadata), that can hang
infinitely (regardless of the reasons) which essentially results in user’s
code deadlock since the Future may never be returned in the first place.
>
> Thoughts?
>
> Oleg
>

RE: KafkaProducer block on send

Posted by Paolo Patierno <pp...@live.com>.
I totally agree with Oleg.

As documentation says the producers send data in an asynchronous way and it is enforced by the send method signature with a Future returned.
It can't block indefinitely without returning to the caller.
I'm mean, you can decide that the code inside the send method blocks indefinitely but in an "asynchronous way", it should first return a Future to the caller that can handle it.

Paolo.

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor 
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience

> Subject: KafkaProducer block on send
> From: ozhurakousky@hortonworks.com
> To: users@kafka.apache.org
> Date: Thu, 7 Apr 2016 13:04:49 +0000
> 
> I know it’s been discussed before, but that conversation never really concluded with any reasonable explanation, so I am bringing it up again as I believe this is a bug that would need to be fixed in some future release.
> Can someone please explain the rational for the following code in KafkaProducer:
> 
> @Override
> public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
>         try {
>             // first make sure the metadata for the topic is available
>             long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
> . . .
> }
> 
> By definition the method that returns Future implies that caller decides how long to wait for the completion via Future.get(TIMETOWAIT). In this case there is an explicit blocking call (waitOnMetadata), that can hang infinitely (regardless of the reasons) which essentially results in user’s code deadlock since the Future may never be returned in the first place.
> 
> Thoughts?
> 
> Oleg
>