You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jay Kreps <ja...@gmail.com> on 2015/02/08 19:25:54 UTC

[DISCUSS] KIP-8 Add a flush method to the new Java producer

Following up on our previous thread on making batch send a little easier,
here is a concrete proposal to add a flush() method to the producer:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API

A proposed implementation is here:
https://issues.apache.org/jira/browse/KAFKA-1865

Thoughts?

-Jay

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Joel Koshy <jj...@gmail.com>.
> silly. But in the absense of flush there is no way to say that. As you say
> you only may that penalty on one of the get() calls, but if the linger.ms
> is high (say 60 seconds) that will be a huge penalty.

That makes sense - thanks for clarifying.

On Mon, Feb 09, 2015 at 08:11:46PM -0800, Jay Kreps wrote:
> Hey Joel,
> 
> The use case would be for something like mirror maker. You want to do
> something like the following:
> 
> while(true) {
>   val recs = consumer.poll(time);
>   for(rec <- recs)
>     producer.send(rec);
>   producer.flush();
>   consumer.commit();
> }
> 
> If you replace flush() with just calling get() on the records the problem
> is that the get call will block for linger.ms plus the time to send. But at
> the time you call flush you are actually done sending new stuff and you
> want that stuff to get sent, lingering around in case of new writes is
> silly. But in the absense of flush there is no way to say that. As you say
> you only may that penalty on one of the get() calls, but if the linger.ms
> is high (say 60 seconds) that will be a huge penalty.
> 
> -Jay
> 
> On Mon, Feb 9, 2015 at 6:23 PM, Joel Koshy <jj...@gmail.com> wrote:
> 
> > - WRT the motivation: "if you set linger.ms > 0 to encourage batching
> >   of messages, which is likely a good idea for this kind of use case,
> >   then the second for loop will block for a ms" -> however, in
> >   practice this will really only be for the first couple of calls
> >   right? Since the subsequent calls would return immediately since in
> >   all likelihood those subsequent messages would have gone out on the
> >   previous message's batch.
> > - I think Bhavesh's suggestion on the timeout makes sense for
> >   consistency (with other blocking-style calls) if nothing else.
> > - Does it make sense to fold in the API changes for KAFKA-1660 and
> >   KAFKA-1669 and do all at once?
> >
> > Thanks,
> >
> > Joel
> >
> >
> > On Mon, Feb 09, 2015 at 02:44:06PM -0800, Guozhang Wang wrote:
> > > The proposal looks good to me, will need some time to review the
> > > implementation RB later.
> > >
> > > Bhavesh, I am wondering how you will use a flush() with a timeout since
> > > such a call does not actually provide any flushing guarantees?
> > >
> > > As for close(), there is a separate JIRA for this:
> > >
> > > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660>
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > mistry.p.bhavesh@gmail.com>
> > > wrote:
> > >
> > > > Hi Jay,
> > > >
> > > > How about adding timeout for each method calls flush(timeout,TimeUnit)
> > and
> > > > close(timeout,TimeUNIT) ?  We had runway io thread issue and caller
> > thread
> > > > should not blocked for ever for these methods ?
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > > >
> > > > > Well actually in the case of linger.ms = 0 the send is still
> > > > asynchronous
> > > > > so calling flush() blocks until all the previously sent records have
> > > > > completed. It doesn't speed anything up in that case, though, since
> > they
> > > > > are already available to send.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <gshapira@cloudera.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Looks good to me.
> > > > > >
> > > > > > I like the idea of not blocking additional sends but not
> > guaranteeing
> > > > > that
> > > > > > flush() will deliver them.
> > > > > >
> > > > > > I assume that with linger.ms = 0, flush will just be a noop
> > (since the
> > > > > > queue will be empty). Is that correct?
> > > > > >
> > > > > > Gwen
> > > > > >
> > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <ja...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Following up on our previous thread on making batch send a little
> > > > > easier,
> > > > > > > here is a concrete proposal to add a flush() method to the
> > producer:
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > > > > >
> > > > > > > A proposed implementation is here:
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > > > >
> > > > > > > Thoughts?
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
> >


Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Jay Kreps <ja...@gmail.com>.
Hey Joel,

The use case would be for something like mirror maker. You want to do
something like the following:

while(true) {
  val recs = consumer.poll(time);
  for(rec <- recs)
    producer.send(rec);
  producer.flush();
  consumer.commit();
}

If you replace flush() with just calling get() on the records the problem
is that the get call will block for linger.ms plus the time to send. But at
the time you call flush you are actually done sending new stuff and you
want that stuff to get sent, lingering around in case of new writes is
silly. But in the absense of flush there is no way to say that. As you say
you only may that penalty on one of the get() calls, but if the linger.ms
is high (say 60 seconds) that will be a huge penalty.

-Jay

On Mon, Feb 9, 2015 at 6:23 PM, Joel Koshy <jj...@gmail.com> wrote:

> - WRT the motivation: "if you set linger.ms > 0 to encourage batching
>   of messages, which is likely a good idea for this kind of use case,
>   then the second for loop will block for a ms" -> however, in
>   practice this will really only be for the first couple of calls
>   right? Since the subsequent calls would return immediately since in
>   all likelihood those subsequent messages would have gone out on the
>   previous message's batch.
> - I think Bhavesh's suggestion on the timeout makes sense for
>   consistency (with other blocking-style calls) if nothing else.
> - Does it make sense to fold in the API changes for KAFKA-1660 and
>   KAFKA-1669 and do all at once?
>
> Thanks,
>
> Joel
>
>
> On Mon, Feb 09, 2015 at 02:44:06PM -0800, Guozhang Wang wrote:
> > The proposal looks good to me, will need some time to review the
> > implementation RB later.
> >
> > Bhavesh, I am wondering how you will use a flush() with a timeout since
> > such a call does not actually provide any flushing guarantees?
> >
> > As for close(), there is a separate JIRA for this:
> >
> > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660>
> >
> > Guozhang
> >
> >
> > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com>
> > wrote:
> >
> > > Hi Jay,
> > >
> > > How about adding timeout for each method calls flush(timeout,TimeUnit)
> and
> > > close(timeout,TimeUNIT) ?  We had runway io thread issue and caller
> thread
> > > should not blocked for ever for these methods ?
> > >
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >
> > > > Well actually in the case of linger.ms = 0 the send is still
> > > asynchronous
> > > > so calling flush() blocks until all the previously sent records have
> > > > completed. It doesn't speed anything up in that case, though, since
> they
> > > > are already available to send.
> > > >
> > > > -Jay
> > > >
> > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <gshapira@cloudera.com
> >
> > > > wrote:
> > > >
> > > > > Looks good to me.
> > > > >
> > > > > I like the idea of not blocking additional sends but not
> guaranteeing
> > > > that
> > > > > flush() will deliver them.
> > > > >
> > > > > I assume that with linger.ms = 0, flush will just be a noop
> (since the
> > > > > queue will be empty). Is that correct?
> > > > >
> > > > > Gwen
> > > > >
> > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <ja...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Following up on our previous thread on making batch send a little
> > > > easier,
> > > > > > here is a concrete proposal to add a flush() method to the
> producer:
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > > > >
> > > > > > A proposed implementation is here:
> > > > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > > >
> > > > > > Thoughts?
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
>
>

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Joel Koshy <jj...@gmail.com>.
- WRT the motivation: "if you set linger.ms > 0 to encourage batching
  of messages, which is likely a good idea for this kind of use case,
  then the second for loop will block for a ms" -> however, in
  practice this will really only be for the first couple of calls
  right? Since the subsequent calls would return immediately since in
  all likelihood those subsequent messages would have gone out on the
  previous message's batch.
- I think Bhavesh's suggestion on the timeout makes sense for
  consistency (with other blocking-style calls) if nothing else.
- Does it make sense to fold in the API changes for KAFKA-1660 and
  KAFKA-1669 and do all at once?

Thanks,

Joel


On Mon, Feb 09, 2015 at 02:44:06PM -0800, Guozhang Wang wrote:
> The proposal looks good to me, will need some time to review the
> implementation RB later.
> 
> Bhavesh, I am wondering how you will use a flush() with a timeout since
> such a call does not actually provide any flushing guarantees?
> 
> As for close(), there is a separate JIRA for this:
> 
> KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660>
> 
> Guozhang
> 
> 
> On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <mi...@gmail.com>
> wrote:
> 
> > Hi Jay,
> >
> > How about adding timeout for each method calls flush(timeout,TimeUnit) and
> > close(timeout,TimeUNIT) ?  We had runway io thread issue and caller thread
> > should not blocked for ever for these methods ?
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Well actually in the case of linger.ms = 0 the send is still
> > asynchronous
> > > so calling flush() blocks until all the previously sent records have
> > > completed. It doesn't speed anything up in that case, though, since they
> > > are already available to send.
> > >
> > > -Jay
> > >
> > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <gs...@cloudera.com>
> > > wrote:
> > >
> > > > Looks good to me.
> > > >
> > > > I like the idea of not blocking additional sends but not guaranteeing
> > > that
> > > > flush() will deliver them.
> > > >
> > > > I assume that with linger.ms = 0, flush will just be a noop (since the
> > > > queue will be empty). Is that correct?
> > > >
> > > > Gwen
> > > >
> > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > > >
> > > > > Following up on our previous thread on making batch send a little
> > > easier,
> > > > > here is a concrete proposal to add a flush() method to the producer:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > > >
> > > > > A proposed implementation is here:
> > > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > -Jay
> > > > >
> > > >
> > >
> >
> 
> 
> 
> -- 
> -- Guozhang


Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Jay Kreps <ja...@gmail.com>.
There have been a couple of rounds of this. Basically a bunch of complaints
people have about the producer boil down to their being no limit on how
long a request will block if the kafka cluster goes hard down. Some of the
discussion was here, I think:
https://issues.apache.org/jira/browse/KAFKA-1788

But a lot was on previous producer-related tickets. E.g. close() blocking
forever if the kafka cluster is down happens because the requests never
fail they just queue indefinitely waiting for kafka to come back.

In any case for the purpose of this KIP we don't need to pick a mechanism
or configuration for controlling client request timeout. All we are saying
now is that we should add such a mechanism and when we do it will address
any concerns about flush() blocking for an inderminate amount of time (so
we don't need any kind of timeout on flush itself now).

-Jay

On Wed, Feb 18, 2015 at 4:24 PM, Joel Koshy <jj...@gmail.com> wrote:

> Actually, could you clarify this a bit (since I'm not sure which
> thread you are referring to) - specifically, how would this tie in
> with the current timeout we have for the producer (for example)?
>
> On Tue, Feb 17, 2015 at 02:55:44PM -0800, Jay Kreps wrote:
> > Yeah there was a separate thread on adding a client-side timeout to
> > requests. We should have this in the new java clients, it just isn't
> there
> > yet. When we do this the flush() call will implicitly have the same
> timeout
> > as the requests (since they will complete or fail by then). I think this
> > makes flush(timeout) and potentially close(timeout) both unnecessary.
> >
> > -Jay
> >
> > On Tue, Feb 17, 2015 at 2:44 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > In the scala clients we have the socket.timeout config as we are using
> > > blocking IOs, when such timeout is reached the TimeoutException will be
> > > thrown from the socket and the client can handle it accordingly; in the
> > > java clients we are switching to non-blocking IOs and hence we will not
> > > have the socket timeout any more.
> > >
> > > I agree that we could add this client request timeout back in the java
> > > clients, in addition to allowing client / server's non-blocking
> selector to
> > > close idle sockets.
> > >
> > > Guozhang
> > >
> > > On Tue, Feb 17, 2015 at 1:55 PM, Jiangjie Qin
> <jq...@linkedin.com.invalid>
> > > wrote:
> > >
> > > > I'm thinking the flush call timeout will naturally be the timeout
> for a
> > > > produce request, No?
> > > >
> > > > Currently it seems we don¹t have a timeout for client requests,
> should we
> > > > have one?
> > > >
> > > > ‹Jiangjie (Becket) Qin
> > > >
> > > > On 2/16/15, 8:19 PM, "Jay Kreps" <ja...@gmail.com> wrote:
> > > >
> > > > >Yes, I think we all agree it would be good to add a client-side
> request
> > > > >timeout. That would effectively imply a flush timeout as well since
> any
> > > > >requests that couldn't complete in that time would be errors and
> hence
> > > > >completed in the definition we gave.
> > > > >
> > > > >-Jay
> > > > >
> > > > >On Mon, Feb 16, 2015 at 7:57 PM, Bhavesh Mistry
> > > > ><mi...@gmail.com>
> > > > >wrote:
> > > > >
> > > > >> Hi All,
> > > > >>
> > > > >> Thanks Jay and all  address concern.  I am fine with just having
> > > flush()
> > > > >> method as long as it covers failure mode and resiliency.  e.g We
> had
> > > > >> situation where entire Kafka cluster brokers were reachable, but
> upon
> > > > >> adding new kafka node and admin migrated "leader to new brokers"
> that
> > > > >>new
> > > > >> brokers is NOT reachable from producer stand point due to fire
> wall
> > > but
> > > > >> metadata would continue to elect new broker as leader for that
> > > > >>partition.
> > > > >>
> > > > >> All I am asking is either you will have to give-up sending to this
> > > > >>broker
> > > > >> or do something in this scenario.  As for the current code 0.8.2
> > > > >>release,
> > > > >> caller thread of flush() or close() method would be blocked for
> > > ever....
> > > > >> so all I am asking is
> > > > >>
> > > > >> https://issues.apache.org/jira/browse/KAFKA-1659
> > > > >> https://issues.apache.org/jira/browse/KAFKA-1660
> > > > >>
> > > > >> Also, I recall that there is timeout also added to batch to
> indicate
> > > how
> > > > >> long "message" can retain in memory before expiring.
> > > > >>
> > > > >> Given,  all this should this API be consistent with others up
> coming
> > > > >> patches for addressing similar problem(s).
> > > > >>
> > > > >>
> > > > >> Otherwise, what we have done is spawn a thread for just calling
> > > close()
> > > > >>or
> > > > >> flush with timeout for join on caller end.
> > > > >>
> > > > >> Anyway, I just wanted to give you issues with existing API and if
> you
> > > > >>guys
> > > > >> think this is fine then, I am ok with this approach. It is just
> that
> > > > >>caller
> > > > >> will have to do bit more work.
> > > > >>
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Bhavesh
> > > > >>
> > > > >> On Thursday, February 12, 2015, Joel Koshy <jj...@gmail.com>
> > > wrote:
> > > > >>
> > > > >> > Yes that is a counter-example. I'm okay either way on whether we
> > > > >> > should have just flush() or have a timeout. Bhavesh, does Jay's
> > > > >> > explanation a few replies prior address your concern? If so,
> shall
> > > we
> > > > >> > consider this closed?
> > > > >> >
> > > > >> > On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote:
> > > > >> > > Yeah we could do that, I guess I just feel like it adds
> confusion
> > > > >> because
> > > > >> > > then you have to think about which timeout you want, when
> likely
> > > you
> > > > >> > don't
> > > > >> > > want a timeout at all.
> > > > >> > >
> > > > >> > > I guess the pattern I was thinking of was fflush or the java
> > > > >> equivalent,
> > > > >> > > which don't have timeouts:
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >>
> > > >
> > >
> http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush(
> > > > >>)
> > > > >> > >
> > > > >> > > -Jay
> > > > >> > >
> > > > >> > > On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <
> jjkoshy.w@gmail.com
> > > >
> > > > >> > wrote:
> > > > >> > >
> > > > >> > > > I think tryFlush with a timeout sounds good to me. This is
> > > really
> > > > >> more
> > > > >> > > > for consistency than anything else. I cannot think of any
> > > standard
> > > > >> > > > blocking calls off the top of my head that don't have a
> timed
> > > > >> variant.
> > > > >> > > > E.g., Thread.join, Object.wait, Future.get Either that, or
> they
> > > > >> > > > provide an entirely non-blocking mode (e.g.,
> > > socketChannel.connect
> > > > >> > > > followed by finishConnect)
> > > > >> > > >
> > > > >> > > > Thanks,
> > > > >> > > >
> > > > >> > > > Joel
> > > > >> > > >
> > > > >> > > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
> > > > >> > > > > Jay,
> > > > >> > > > >
> > > > >> > > > > The .flush() call seems like it would be the best way if
> you
> > > > >>wanted
> > > > >> > > > to-do a
> > > > >> > > > > clean shutdown of the new producer?
> > > > >> > > > >
> > > > >> > > > > So, you could in your code "stop all incoming requests &&
> > > > >> > > > producer.flush()
> > > > >> > > > > && system.exit(value)" and know pretty much you won't drop
> > > > >>anything
> > > > >> > on
> > > > >> > > > the
> > > > >> > > > > floor.
> > > > >> > > > >
> > > > >> > > > > This can be done with the callbacks and futures (sure) but
> > > > >>.flush()
> > > > >> > seems
> > > > >> > > > > to be the right time to block and a few lines of code, no?
> > > > >> > > > >
> > > > >> > > > > ~ Joestein
> > > > >> > > > >
> > > > >> > > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps
> > > > >><ja...@gmail.com>
> > > > >> > wrote:
> > > > >> > > > >
> > > > >> > > > > > Hey Bhavesh,
> > > > >> > > > > >
> > > > >> > > > > > If a broker is not available a new one should be
> elected to
> > > > >>take
> > > > >> > over,
> > > > >> > > > so
> > > > >> > > > > > although the flush might take longer it should still be
> > > quick.
> > > > >> > Even if
> > > > >> > > > not
> > > > >> > > > > > this should result in an error not a hang.
> > > > >> > > > > >
> > > > >> > > > > > The cases you enumerated are all covered already--if the
> > > user
> > > > >> > wants to
> > > > >> > > > > > retry that is covered by the retry setting in the
> client,
> > > for
> > > > >>all
> > > > >> > the
> > > > >> > > > > > errors that is considered completion of the request. The
> > > post
> > > > >> > > > condition of
> > > > >> > > > > > flush isn't that all sends complete successfully, just
> that
> > > > >>they
> > > > >> > > > complete.
> > > > >> > > > > > So if you try to send a message that is too big, when
> flush
> > > > >> returns
> > > > >> > > > calling
> > > > >> > > > > > .get() on the future should not block and should
> produce the
> > > > >> error.
> > > > >> > > > > >
> > > > >> > > > > > Basically the argument I am making is that the only
> reason
> > > you
> > > > >> > want to
> > > > >> > > > call
> > > > >> > > > > > flush() is to guarantee all the sends complete so if it
> > > > >>doesn't
> > > > >> > > > guarantee
> > > > >> > > > > > that it will be somewhat confusing. This does mean
> blocking,
> > > > >>but
> > > > >> > if you
> > > > >> > > > > > don't want to block on the send then you wouldn't call
> > > > >>flush().
> > > > >> > > > > >
> > > > >> > > > > > This has no impact on the block.on.buffer full setting.
> That
> > > > >> > impacts
> > > > >> > > > what
> > > > >> > > > > > happens when send() can't append to the buffer because
> it is
> > > > >> full.
> > > > >> > > > flush()
> > > > >> > > > > > means any message previously sent (i.e. for which send()
> > > call
> > > > >>has
> > > > >> > > > returned)
> > > > >> > > > > > needs to have its request completed. Hope that makes
> sense.
> > > > >> > > > > >
> > > > >> > > > > > -Jay
> > > > >> > > > > >
> > > > >> > > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
> > > > >> > > > > > mistry.p.bhavesh@gmail.com>
> > > > >> > > > > > wrote:
> > > > >> > > > > >
> > > > >> > > > > > > HI Jay,
> > > > >> > > > > > >
> > > > >> > > > > > > Imagine, if you have flaky network connection to
> brokers,
> > > > >>and
> > > > >> if
> > > > >> > > > flush()
> > > > >> > > > > > > will be blocked if "one of broker is not available" (
> > > > >>basically
> > > > >> > How
> > > > >> > > > would
> > > > >> > > > > > > be address failure mode and io thread not able to
> drain
> > > > >>records
> > > > >> > or
> > > > >> > > > busy
> > > > >> > > > > > due
> > > > >> > > > > > > to pending request". Do you flush() method is only to
> > > flush
> > > > >>to
> > > > >> > in mem
> > > > >> > > > > > queue
> > > > >> > > > > > > or flush to broker over the network().
> > > > >> > > > > > >
> > > > >> > > > > > > Timeout helps with and pushing caller to handle what
> to do
> > > > >>?
> > > > >> > e.g
> > > > >> > > > > > > re-enqueue records, drop entire batch or one of
> message is
> > > > >>too
> > > > >> > big
> > > > >> > > > cross
> > > > >> > > > > > > the limit of max.message.size etc...
> > > > >> > > > > > >
> > > > >> > > > > > > Also, according to java doc for API  "The method will
> > > block
> > > > >> > until all
> > > > >> > > > > > > previously sent records have completed sending (either
> > > > >> > successfully
> > > > >> > > > or
> > > > >> > > > > > with
> > > > >> > > > > > > an error)", does this by-pass rule set by for
> > > > >> > block.on.buffer.full or
> > > > >> > > > > > > batch.size
> > > > >> > > > > > > when under load.
> > > > >> > > > > > >
> > > > >> > > > > > > That was my intention, and I am sorry I mixed-up
> close()
> > > > >>method
> > > > >> > here
> > > > >> > > > > > > without knowing that this is only for bulk send.
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > > Thanks,
> > > > >> > > > > > >
> > > > >> > > > > > > Bhavesh
> > > > >> > > > > > >
> > > > >> > > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps
> > > > >><jay.kreps@gmail.com
> > > > >> >
> > > > >> > > > wrote:
> > > > >> > > > > > >
> > > > >> > > > > > > > Yeah I second the problem Guozhang flags with giving
> > > > >>flush a
> > > > >> > > > timeout.
> > > > >> > > > > > In
> > > > >> > > > > > > > general failover in Kafka is a bounded thing unless
> you
> > > > >>have
> > > > >> > > > brought
> > > > >> > > > > > your
> > > > >> > > > > > > > Kafka cluster down entirely so I think depending on
> that
> > > > >> bound
> > > > >> > > > > > implicitly
> > > > >> > > > > > > > is okay.
> > > > >> > > > > > > >
> > > > >> > > > > > > > It is possible to make flush() be instead
> > > > >> > > > > > > >   boolean tryFlush(long timeout, TimeUnit unit);
> > > > >> > > > > > > >
> > > > >> > > > > > > > But I am somewhat skeptical that people will use
> this
> > > > >> > correctly.
> > > > >> > > > I.e
> > > > >> > > > > > > > consider the mirror maker code snippet I gave
> above, how
> > > > >> would
> > > > >> > one
> > > > >> > > > > > > actually
> > > > >> > > > > > > > recover in this case other than retrying (which the
> > > client
> > > > >> > already
> > > > >> > > > does
> > > > >> > > > > > > > automatically)? After all if you are okay losing
> data
> > > then
> > > > >> you
> > > > >> > > > don't
> > > > >> > > > > > need
> > > > >> > > > > > > > to bother calling flush at all, you can just let the
> > > > >>messages
> > > > >> > be
> > > > >> > > > sent
> > > > >> > > > > > > > asynchronously.
> > > > >> > > > > > > >
> > > > >> > > > > > > > I think close() is actually different because you
> may
> > > well
> > > > >> > want to
> > > > >> > > > > > > shutdown
> > > > >> > > > > > > > immediately and just throw away unsent events.
> > > > >> > > > > > > >
> > > > >> > > > > > > > -Jay
> > > > >> > > > > > > >
> > > > >> > > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <
> > > > >> > wangguoz@gmail.com>
> > > > >> > > > > > > wrote:
> > > > >> > > > > > > >
> > > > >> > > > > > > > > The proposal looks good to me, will need some
> time to
> > > > >> review
> > > > >> > the
> > > > >> > > > > > > > > implementation RB later.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Bhavesh, I am wondering how you will use a flush()
> > > with
> > > > >>a
> > > > >> > timeout
> > > > >> > > > > > since
> > > > >> > > > > > > > > such a call does not actually provide any flushing
> > > > >> > guarantees?
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > As for close(), there is a separate JIRA for this:
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > KAFKA-1660 <
> > > > >> https://issues.apache.org/jira/browse/KAFKA-1660
> > > > >> > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Guozhang
> > > > >> > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > > > >> > > > > > > > mistry.p.bhavesh@gmail.com
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > wrote:
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > > Hi Jay,
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > How about adding timeout for each method calls
> > > > >> > > > > > > flush(timeout,TimeUnit)
> > > > >> > > > > > > > > and
> > > > >> > > > > > > > > > close(timeout,TimeUNIT) ?  We had runway io
> thread
> > > > >>issue
> > > > >> > and
> > > > >> > > > caller
> > > > >> > > > > > > > > thread
> > > > >> > > > > > > > > > should not blocked for ever for these methods ?
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Thanks,
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Bhavesh
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <
> > > > >> > > > jay.kreps@gmail.com>
> > > > >> > > > > > > > wrote:
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > Well actually in the case of linger.ms = 0
> the
> > > send
> > > > >>is
> > > > >> > still
> > > > >> > > > > > > > > > asynchronous
> > > > >> > > > > > > > > > > so calling flush() blocks until all the
> previously
> > > > >>sent
> > > > >> > > > records
> > > > >> > > > > > > have
> > > > >> > > > > > > > > > > completed. It doesn't speed anything up in
> that
> > > > >>case,
> > > > >> > though,
> > > > >> > > > > > since
> > > > >> > > > > > > > > they
> > > > >> > > > > > > > > > > are already available to send.
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > -Jay
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira
> <
> > > > >> > > > > > > gshapira@cloudera.com
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > > > wrote:
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > > Looks good to me.
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > I like the idea of not blocking additional
> sends
> > > > >>but
> > > > >> > not
> > > > >> > > > > > > > guaranteeing
> > > > >> > > > > > > > > > > that
> > > > >> > > > > > > > > > > > flush() will deliver them.
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > I assume that with linger.ms = 0, flush
> will
> > > just
> > > > >> be a
> > > > >> > > > noop
> > > > >> > > > > > > (since
> > > > >> > > > > > > > > the
> > > > >> > > > > > > > > > > > queue will be empty). Is that correct?
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > Gwen
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
> > > > >> > > > > > jay.kreps@gmail.com>
> > > > >> > > > > > > > > > wrote:
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > Following up on our previous thread on
> making
> > > > >>batch
> > > > >> > send
> > > > >> > > > a
> > > > >> > > > > > > little
> > > > >> > > > > > > > > > > easier,
> > > > >> > > > > > > > > > > > > here is a concrete proposal to add a
> flush()
> > > > >>method
> > > > >> > to
> > > > >> > > > the
> > > > >> > > > > > > > > producer:
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > >
> > > > >> >
> > > > >>
> > > > >>
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+met
> > > > >>hod+to+the+producer+API
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > A proposed implementation is here:
> > > > >> > > > > > > > > > > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > Thoughts?
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > -Jay
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > --
> > > > >> > > > > > > > > -- Guozhang
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > >
> > > > >> > > >
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
>
>

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Joel Koshy <jj...@gmail.com>.
Actually, could you clarify this a bit (since I'm not sure which
thread you are referring to) - specifically, how would this tie in
with the current timeout we have for the producer (for example)?

On Tue, Feb 17, 2015 at 02:55:44PM -0800, Jay Kreps wrote:
> Yeah there was a separate thread on adding a client-side timeout to
> requests. We should have this in the new java clients, it just isn't there
> yet. When we do this the flush() call will implicitly have the same timeout
> as the requests (since they will complete or fail by then). I think this
> makes flush(timeout) and potentially close(timeout) both unnecessary.
> 
> -Jay
> 
> On Tue, Feb 17, 2015 at 2:44 PM, Guozhang Wang <wa...@gmail.com> wrote:
> 
> > In the scala clients we have the socket.timeout config as we are using
> > blocking IOs, when such timeout is reached the TimeoutException will be
> > thrown from the socket and the client can handle it accordingly; in the
> > java clients we are switching to non-blocking IOs and hence we will not
> > have the socket timeout any more.
> >
> > I agree that we could add this client request timeout back in the java
> > clients, in addition to allowing client / server's non-blocking selector to
> > close idle sockets.
> >
> > Guozhang
> >
> > On Tue, Feb 17, 2015 at 1:55 PM, Jiangjie Qin <jq...@linkedin.com.invalid>
> > wrote:
> >
> > > I'm thinking the flush call timeout will naturally be the timeout for a
> > > produce request, No?
> > >
> > > Currently it seems we don¹t have a timeout for client requests, should we
> > > have one?
> > >
> > > ‹Jiangjie (Becket) Qin
> > >
> > > On 2/16/15, 8:19 PM, "Jay Kreps" <ja...@gmail.com> wrote:
> > >
> > > >Yes, I think we all agree it would be good to add a client-side request
> > > >timeout. That would effectively imply a flush timeout as well since any
> > > >requests that couldn't complete in that time would be errors and hence
> > > >completed in the definition we gave.
> > > >
> > > >-Jay
> > > >
> > > >On Mon, Feb 16, 2015 at 7:57 PM, Bhavesh Mistry
> > > ><mi...@gmail.com>
> > > >wrote:
> > > >
> > > >> Hi All,
> > > >>
> > > >> Thanks Jay and all  address concern.  I am fine with just having
> > flush()
> > > >> method as long as it covers failure mode and resiliency.  e.g We had
> > > >> situation where entire Kafka cluster brokers were reachable, but upon
> > > >> adding new kafka node and admin migrated "leader to new brokers"  that
> > > >>new
> > > >> brokers is NOT reachable from producer stand point due to fire wall
> > but
> > > >> metadata would continue to elect new broker as leader for that
> > > >>partition.
> > > >>
> > > >> All I am asking is either you will have to give-up sending to this
> > > >>broker
> > > >> or do something in this scenario.  As for the current code 0.8.2
> > > >>release,
> > > >> caller thread of flush() or close() method would be blocked for
> > ever....
> > > >> so all I am asking is
> > > >>
> > > >> https://issues.apache.org/jira/browse/KAFKA-1659
> > > >> https://issues.apache.org/jira/browse/KAFKA-1660
> > > >>
> > > >> Also, I recall that there is timeout also added to batch to indicate
> > how
> > > >> long "message" can retain in memory before expiring.
> > > >>
> > > >> Given,  all this should this API be consistent with others up coming
> > > >> patches for addressing similar problem(s).
> > > >>
> > > >>
> > > >> Otherwise, what we have done is spawn a thread for just calling
> > close()
> > > >>or
> > > >> flush with timeout for join on caller end.
> > > >>
> > > >> Anyway, I just wanted to give you issues with existing API and if you
> > > >>guys
> > > >> think this is fine then, I am ok with this approach. It is just that
> > > >>caller
> > > >> will have to do bit more work.
> > > >>
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Bhavesh
> > > >>
> > > >> On Thursday, February 12, 2015, Joel Koshy <jj...@gmail.com>
> > wrote:
> > > >>
> > > >> > Yes that is a counter-example. I'm okay either way on whether we
> > > >> > should have just flush() or have a timeout. Bhavesh, does Jay's
> > > >> > explanation a few replies prior address your concern? If so, shall
> > we
> > > >> > consider this closed?
> > > >> >
> > > >> > On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote:
> > > >> > > Yeah we could do that, I guess I just feel like it adds confusion
> > > >> because
> > > >> > > then you have to think about which timeout you want, when likely
> > you
> > > >> > don't
> > > >> > > want a timeout at all.
> > > >> > >
> > > >> > > I guess the pattern I was thinking of was fflush or the java
> > > >> equivalent,
> > > >> > > which don't have timeouts:
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > >
> > http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush(
> > > >>)
> > > >> > >
> > > >> > > -Jay
> > > >> > >
> > > >> > > On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jjkoshy.w@gmail.com
> > >
> > > >> > wrote:
> > > >> > >
> > > >> > > > I think tryFlush with a timeout sounds good to me. This is
> > really
> > > >> more
> > > >> > > > for consistency than anything else. I cannot think of any
> > standard
> > > >> > > > blocking calls off the top of my head that don't have a timed
> > > >> variant.
> > > >> > > > E.g., Thread.join, Object.wait, Future.get Either that, or they
> > > >> > > > provide an entirely non-blocking mode (e.g.,
> > socketChannel.connect
> > > >> > > > followed by finishConnect)
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > >
> > > >> > > > Joel
> > > >> > > >
> > > >> > > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
> > > >> > > > > Jay,
> > > >> > > > >
> > > >> > > > > The .flush() call seems like it would be the best way if you
> > > >>wanted
> > > >> > > > to-do a
> > > >> > > > > clean shutdown of the new producer?
> > > >> > > > >
> > > >> > > > > So, you could in your code "stop all incoming requests &&
> > > >> > > > producer.flush()
> > > >> > > > > && system.exit(value)" and know pretty much you won't drop
> > > >>anything
> > > >> > on
> > > >> > > > the
> > > >> > > > > floor.
> > > >> > > > >
> > > >> > > > > This can be done with the callbacks and futures (sure) but
> > > >>.flush()
> > > >> > seems
> > > >> > > > > to be the right time to block and a few lines of code, no?
> > > >> > > > >
> > > >> > > > > ~ Joestein
> > > >> > > > >
> > > >> > > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps
> > > >><ja...@gmail.com>
> > > >> > wrote:
> > > >> > > > >
> > > >> > > > > > Hey Bhavesh,
> > > >> > > > > >
> > > >> > > > > > If a broker is not available a new one should be elected to
> > > >>take
> > > >> > over,
> > > >> > > > so
> > > >> > > > > > although the flush might take longer it should still be
> > quick.
> > > >> > Even if
> > > >> > > > not
> > > >> > > > > > this should result in an error not a hang.
> > > >> > > > > >
> > > >> > > > > > The cases you enumerated are all covered already--if the
> > user
> > > >> > wants to
> > > >> > > > > > retry that is covered by the retry setting in the client,
> > for
> > > >>all
> > > >> > the
> > > >> > > > > > errors that is considered completion of the request. The
> > post
> > > >> > > > condition of
> > > >> > > > > > flush isn't that all sends complete successfully, just that
> > > >>they
> > > >> > > > complete.
> > > >> > > > > > So if you try to send a message that is too big, when flush
> > > >> returns
> > > >> > > > calling
> > > >> > > > > > .get() on the future should not block and should produce the
> > > >> error.
> > > >> > > > > >
> > > >> > > > > > Basically the argument I am making is that the only reason
> > you
> > > >> > want to
> > > >> > > > call
> > > >> > > > > > flush() is to guarantee all the sends complete so if it
> > > >>doesn't
> > > >> > > > guarantee
> > > >> > > > > > that it will be somewhat confusing. This does mean blocking,
> > > >>but
> > > >> > if you
> > > >> > > > > > don't want to block on the send then you wouldn't call
> > > >>flush().
> > > >> > > > > >
> > > >> > > > > > This has no impact on the block.on.buffer full setting. That
> > > >> > impacts
> > > >> > > > what
> > > >> > > > > > happens when send() can't append to the buffer because it is
> > > >> full.
> > > >> > > > flush()
> > > >> > > > > > means any message previously sent (i.e. for which send()
> > call
> > > >>has
> > > >> > > > returned)
> > > >> > > > > > needs to have its request completed. Hope that makes sense.
> > > >> > > > > >
> > > >> > > > > > -Jay
> > > >> > > > > >
> > > >> > > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
> > > >> > > > > > mistry.p.bhavesh@gmail.com>
> > > >> > > > > > wrote:
> > > >> > > > > >
> > > >> > > > > > > HI Jay,
> > > >> > > > > > >
> > > >> > > > > > > Imagine, if you have flaky network connection to brokers,
> > > >>and
> > > >> if
> > > >> > > > flush()
> > > >> > > > > > > will be blocked if "one of broker is not available" (
> > > >>basically
> > > >> > How
> > > >> > > > would
> > > >> > > > > > > be address failure mode and io thread not able to drain
> > > >>records
> > > >> > or
> > > >> > > > busy
> > > >> > > > > > due
> > > >> > > > > > > to pending request". Do you flush() method is only to
> > flush
> > > >>to
> > > >> > in mem
> > > >> > > > > > queue
> > > >> > > > > > > or flush to broker over the network().
> > > >> > > > > > >
> > > >> > > > > > > Timeout helps with and pushing caller to handle what to do
> > > >>?
> > > >> > e.g
> > > >> > > > > > > re-enqueue records, drop entire batch or one of message is
> > > >>too
> > > >> > big
> > > >> > > > cross
> > > >> > > > > > > the limit of max.message.size etc...
> > > >> > > > > > >
> > > >> > > > > > > Also, according to java doc for API  "The method will
> > block
> > > >> > until all
> > > >> > > > > > > previously sent records have completed sending (either
> > > >> > successfully
> > > >> > > > or
> > > >> > > > > > with
> > > >> > > > > > > an error)", does this by-pass rule set by for
> > > >> > block.on.buffer.full or
> > > >> > > > > > > batch.size
> > > >> > > > > > > when under load.
> > > >> > > > > > >
> > > >> > > > > > > That was my intention, and I am sorry I mixed-up close()
> > > >>method
> > > >> > here
> > > >> > > > > > > without knowing that this is only for bulk send.
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > Thanks,
> > > >> > > > > > >
> > > >> > > > > > > Bhavesh
> > > >> > > > > > >
> > > >> > > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps
> > > >><jay.kreps@gmail.com
> > > >> >
> > > >> > > > wrote:
> > > >> > > > > > >
> > > >> > > > > > > > Yeah I second the problem Guozhang flags with giving
> > > >>flush a
> > > >> > > > timeout.
> > > >> > > > > > In
> > > >> > > > > > > > general failover in Kafka is a bounded thing unless you
> > > >>have
> > > >> > > > brought
> > > >> > > > > > your
> > > >> > > > > > > > Kafka cluster down entirely so I think depending on that
> > > >> bound
> > > >> > > > > > implicitly
> > > >> > > > > > > > is okay.
> > > >> > > > > > > >
> > > >> > > > > > > > It is possible to make flush() be instead
> > > >> > > > > > > >   boolean tryFlush(long timeout, TimeUnit unit);
> > > >> > > > > > > >
> > > >> > > > > > > > But I am somewhat skeptical that people will use this
> > > >> > correctly.
> > > >> > > > I.e
> > > >> > > > > > > > consider the mirror maker code snippet I gave above, how
> > > >> would
> > > >> > one
> > > >> > > > > > > actually
> > > >> > > > > > > > recover in this case other than retrying (which the
> > client
> > > >> > already
> > > >> > > > does
> > > >> > > > > > > > automatically)? After all if you are okay losing data
> > then
> > > >> you
> > > >> > > > don't
> > > >> > > > > > need
> > > >> > > > > > > > to bother calling flush at all, you can just let the
> > > >>messages
> > > >> > be
> > > >> > > > sent
> > > >> > > > > > > > asynchronously.
> > > >> > > > > > > >
> > > >> > > > > > > > I think close() is actually different because you may
> > well
> > > >> > want to
> > > >> > > > > > > shutdown
> > > >> > > > > > > > immediately and just throw away unsent events.
> > > >> > > > > > > >
> > > >> > > > > > > > -Jay
> > > >> > > > > > > >
> > > >> > > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <
> > > >> > wangguoz@gmail.com>
> > > >> > > > > > > wrote:
> > > >> > > > > > > >
> > > >> > > > > > > > > The proposal looks good to me, will need some time to
> > > >> review
> > > >> > the
> > > >> > > > > > > > > implementation RB later.
> > > >> > > > > > > > >
> > > >> > > > > > > > > Bhavesh, I am wondering how you will use a flush()
> > with
> > > >>a
> > > >> > timeout
> > > >> > > > > > since
> > > >> > > > > > > > > such a call does not actually provide any flushing
> > > >> > guarantees?
> > > >> > > > > > > > >
> > > >> > > > > > > > > As for close(), there is a separate JIRA for this:
> > > >> > > > > > > > >
> > > >> > > > > > > > > KAFKA-1660 <
> > > >> https://issues.apache.org/jira/browse/KAFKA-1660
> > > >> > >
> > > >> > > > > > > > >
> > > >> > > > > > > > > Guozhang
> > > >> > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > > >> > > > > > > > mistry.p.bhavesh@gmail.com
> > > >> > > > > > > > > >
> > > >> > > > > > > > > wrote:
> > > >> > > > > > > > >
> > > >> > > > > > > > > > Hi Jay,
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > How about adding timeout for each method calls
> > > >> > > > > > > flush(timeout,TimeUnit)
> > > >> > > > > > > > > and
> > > >> > > > > > > > > > close(timeout,TimeUNIT) ?  We had runway io thread
> > > >>issue
> > > >> > and
> > > >> > > > caller
> > > >> > > > > > > > > thread
> > > >> > > > > > > > > > should not blocked for ever for these methods ?
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Thanks,
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Bhavesh
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <
> > > >> > > > jay.kreps@gmail.com>
> > > >> > > > > > > > wrote:
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > > Well actually in the case of linger.ms = 0 the
> > send
> > > >>is
> > > >> > still
> > > >> > > > > > > > > > asynchronous
> > > >> > > > > > > > > > > so calling flush() blocks until all the previously
> > > >>sent
> > > >> > > > records
> > > >> > > > > > > have
> > > >> > > > > > > > > > > completed. It doesn't speed anything up in that
> > > >>case,
> > > >> > though,
> > > >> > > > > > since
> > > >> > > > > > > > > they
> > > >> > > > > > > > > > > are already available to send.
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > -Jay
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
> > > >> > > > > > > gshapira@cloudera.com
> > > >> > > > > > > > >
> > > >> > > > > > > > > > > wrote:
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > > Looks good to me.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > I like the idea of not blocking additional sends
> > > >>but
> > > >> > not
> > > >> > > > > > > > guaranteeing
> > > >> > > > > > > > > > > that
> > > >> > > > > > > > > > > > flush() will deliver them.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > I assume that with linger.ms = 0, flush will
> > just
> > > >> be a
> > > >> > > > noop
> > > >> > > > > > > (since
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > > > queue will be empty). Is that correct?
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > Gwen
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
> > > >> > > > > > jay.kreps@gmail.com>
> > > >> > > > > > > > > > wrote:
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > > Following up on our previous thread on making
> > > >>batch
> > > >> > send
> > > >> > > > a
> > > >> > > > > > > little
> > > >> > > > > > > > > > > easier,
> > > >> > > > > > > > > > > > > here is a concrete proposal to add a flush()
> > > >>method
> > > >> > to
> > > >> > > > the
> > > >> > > > > > > > > producer:
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > >
> > > >> >
> > > >>
> > > >>
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+met
> > > >>hod+to+the+producer+API
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > A proposed implementation is here:
> > > >> > > > > > > > > > > > >
> > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > Thoughts?
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > -Jay
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > > > --
> > > >> > > > > > > > > -- Guozhang
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > >
> > > >> > > >
> > > >> >
> > > >> >
> > > >>
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >


Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Jay Kreps <ja...@gmail.com>.
Yeah there was a separate thread on adding a client-side timeout to
requests. We should have this in the new java clients, it just isn't there
yet. When we do this the flush() call will implicitly have the same timeout
as the requests (since they will complete or fail by then). I think this
makes flush(timeout) and potentially close(timeout) both unnecessary.

-Jay

On Tue, Feb 17, 2015 at 2:44 PM, Guozhang Wang <wa...@gmail.com> wrote:

> In the scala clients we have the socket.timeout config as we are using
> blocking IOs, when such timeout is reached the TimeoutException will be
> thrown from the socket and the client can handle it accordingly; in the
> java clients we are switching to non-blocking IOs and hence we will not
> have the socket timeout any more.
>
> I agree that we could add this client request timeout back in the java
> clients, in addition to allowing client / server's non-blocking selector to
> close idle sockets.
>
> Guozhang
>
> On Tue, Feb 17, 2015 at 1:55 PM, Jiangjie Qin <jq...@linkedin.com.invalid>
> wrote:
>
> > I'm thinking the flush call timeout will naturally be the timeout for a
> > produce request, No?
> >
> > Currently it seems we don¹t have a timeout for client requests, should we
> > have one?
> >
> > ‹Jiangjie (Becket) Qin
> >
> > On 2/16/15, 8:19 PM, "Jay Kreps" <ja...@gmail.com> wrote:
> >
> > >Yes, I think we all agree it would be good to add a client-side request
> > >timeout. That would effectively imply a flush timeout as well since any
> > >requests that couldn't complete in that time would be errors and hence
> > >completed in the definition we gave.
> > >
> > >-Jay
> > >
> > >On Mon, Feb 16, 2015 at 7:57 PM, Bhavesh Mistry
> > ><mi...@gmail.com>
> > >wrote:
> > >
> > >> Hi All,
> > >>
> > >> Thanks Jay and all  address concern.  I am fine with just having
> flush()
> > >> method as long as it covers failure mode and resiliency.  e.g We had
> > >> situation where entire Kafka cluster brokers were reachable, but upon
> > >> adding new kafka node and admin migrated "leader to new brokers"  that
> > >>new
> > >> brokers is NOT reachable from producer stand point due to fire wall
> but
> > >> metadata would continue to elect new broker as leader for that
> > >>partition.
> > >>
> > >> All I am asking is either you will have to give-up sending to this
> > >>broker
> > >> or do something in this scenario.  As for the current code 0.8.2
> > >>release,
> > >> caller thread of flush() or close() method would be blocked for
> ever....
> > >> so all I am asking is
> > >>
> > >> https://issues.apache.org/jira/browse/KAFKA-1659
> > >> https://issues.apache.org/jira/browse/KAFKA-1660
> > >>
> > >> Also, I recall that there is timeout also added to batch to indicate
> how
> > >> long "message" can retain in memory before expiring.
> > >>
> > >> Given,  all this should this API be consistent with others up coming
> > >> patches for addressing similar problem(s).
> > >>
> > >>
> > >> Otherwise, what we have done is spawn a thread for just calling
> close()
> > >>or
> > >> flush with timeout for join on caller end.
> > >>
> > >> Anyway, I just wanted to give you issues with existing API and if you
> > >>guys
> > >> think this is fine then, I am ok with this approach. It is just that
> > >>caller
> > >> will have to do bit more work.
> > >>
> > >>
> > >> Thanks,
> > >>
> > >> Bhavesh
> > >>
> > >> On Thursday, February 12, 2015, Joel Koshy <jj...@gmail.com>
> wrote:
> > >>
> > >> > Yes that is a counter-example. I'm okay either way on whether we
> > >> > should have just flush() or have a timeout. Bhavesh, does Jay's
> > >> > explanation a few replies prior address your concern? If so, shall
> we
> > >> > consider this closed?
> > >> >
> > >> > On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote:
> > >> > > Yeah we could do that, I guess I just feel like it adds confusion
> > >> because
> > >> > > then you have to think about which timeout you want, when likely
> you
> > >> > don't
> > >> > > want a timeout at all.
> > >> > >
> > >> > > I guess the pattern I was thinking of was fflush or the java
> > >> equivalent,
> > >> > > which don't have timeouts:
> > >> > >
> > >> >
> > >>
> > >>
> >
> http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush(
> > >>)
> > >> > >
> > >> > > -Jay
> > >> > >
> > >> > > On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jjkoshy.w@gmail.com
> >
> > >> > wrote:
> > >> > >
> > >> > > > I think tryFlush with a timeout sounds good to me. This is
> really
> > >> more
> > >> > > > for consistency than anything else. I cannot think of any
> standard
> > >> > > > blocking calls off the top of my head that don't have a timed
> > >> variant.
> > >> > > > E.g., Thread.join, Object.wait, Future.get Either that, or they
> > >> > > > provide an entirely non-blocking mode (e.g.,
> socketChannel.connect
> > >> > > > followed by finishConnect)
> > >> > > >
> > >> > > > Thanks,
> > >> > > >
> > >> > > > Joel
> > >> > > >
> > >> > > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
> > >> > > > > Jay,
> > >> > > > >
> > >> > > > > The .flush() call seems like it would be the best way if you
> > >>wanted
> > >> > > > to-do a
> > >> > > > > clean shutdown of the new producer?
> > >> > > > >
> > >> > > > > So, you could in your code "stop all incoming requests &&
> > >> > > > producer.flush()
> > >> > > > > && system.exit(value)" and know pretty much you won't drop
> > >>anything
> > >> > on
> > >> > > > the
> > >> > > > > floor.
> > >> > > > >
> > >> > > > > This can be done with the callbacks and futures (sure) but
> > >>.flush()
> > >> > seems
> > >> > > > > to be the right time to block and a few lines of code, no?
> > >> > > > >
> > >> > > > > ~ Joestein
> > >> > > > >
> > >> > > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps
> > >><ja...@gmail.com>
> > >> > wrote:
> > >> > > > >
> > >> > > > > > Hey Bhavesh,
> > >> > > > > >
> > >> > > > > > If a broker is not available a new one should be elected to
> > >>take
> > >> > over,
> > >> > > > so
> > >> > > > > > although the flush might take longer it should still be
> quick.
> > >> > Even if
> > >> > > > not
> > >> > > > > > this should result in an error not a hang.
> > >> > > > > >
> > >> > > > > > The cases you enumerated are all covered already--if the
> user
> > >> > wants to
> > >> > > > > > retry that is covered by the retry setting in the client,
> for
> > >>all
> > >> > the
> > >> > > > > > errors that is considered completion of the request. The
> post
> > >> > > > condition of
> > >> > > > > > flush isn't that all sends complete successfully, just that
> > >>they
> > >> > > > complete.
> > >> > > > > > So if you try to send a message that is too big, when flush
> > >> returns
> > >> > > > calling
> > >> > > > > > .get() on the future should not block and should produce the
> > >> error.
> > >> > > > > >
> > >> > > > > > Basically the argument I am making is that the only reason
> you
> > >> > want to
> > >> > > > call
> > >> > > > > > flush() is to guarantee all the sends complete so if it
> > >>doesn't
> > >> > > > guarantee
> > >> > > > > > that it will be somewhat confusing. This does mean blocking,
> > >>but
> > >> > if you
> > >> > > > > > don't want to block on the send then you wouldn't call
> > >>flush().
> > >> > > > > >
> > >> > > > > > This has no impact on the block.on.buffer full setting. That
> > >> > impacts
> > >> > > > what
> > >> > > > > > happens when send() can't append to the buffer because it is
> > >> full.
> > >> > > > flush()
> > >> > > > > > means any message previously sent (i.e. for which send()
> call
> > >>has
> > >> > > > returned)
> > >> > > > > > needs to have its request completed. Hope that makes sense.
> > >> > > > > >
> > >> > > > > > -Jay
> > >> > > > > >
> > >> > > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
> > >> > > > > > mistry.p.bhavesh@gmail.com>
> > >> > > > > > wrote:
> > >> > > > > >
> > >> > > > > > > HI Jay,
> > >> > > > > > >
> > >> > > > > > > Imagine, if you have flaky network connection to brokers,
> > >>and
> > >> if
> > >> > > > flush()
> > >> > > > > > > will be blocked if "one of broker is not available" (
> > >>basically
> > >> > How
> > >> > > > would
> > >> > > > > > > be address failure mode and io thread not able to drain
> > >>records
> > >> > or
> > >> > > > busy
> > >> > > > > > due
> > >> > > > > > > to pending request". Do you flush() method is only to
> flush
> > >>to
> > >> > in mem
> > >> > > > > > queue
> > >> > > > > > > or flush to broker over the network().
> > >> > > > > > >
> > >> > > > > > > Timeout helps with and pushing caller to handle what to do
> > >>?
> > >> > e.g
> > >> > > > > > > re-enqueue records, drop entire batch or one of message is
> > >>too
> > >> > big
> > >> > > > cross
> > >> > > > > > > the limit of max.message.size etc...
> > >> > > > > > >
> > >> > > > > > > Also, according to java doc for API  "The method will
> block
> > >> > until all
> > >> > > > > > > previously sent records have completed sending (either
> > >> > successfully
> > >> > > > or
> > >> > > > > > with
> > >> > > > > > > an error)", does this by-pass rule set by for
> > >> > block.on.buffer.full or
> > >> > > > > > > batch.size
> > >> > > > > > > when under load.
> > >> > > > > > >
> > >> > > > > > > That was my intention, and I am sorry I mixed-up close()
> > >>method
> > >> > here
> > >> > > > > > > without knowing that this is only for bulk send.
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > Thanks,
> > >> > > > > > >
> > >> > > > > > > Bhavesh
> > >> > > > > > >
> > >> > > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps
> > >><jay.kreps@gmail.com
> > >> >
> > >> > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > Yeah I second the problem Guozhang flags with giving
> > >>flush a
> > >> > > > timeout.
> > >> > > > > > In
> > >> > > > > > > > general failover in Kafka is a bounded thing unless you
> > >>have
> > >> > > > brought
> > >> > > > > > your
> > >> > > > > > > > Kafka cluster down entirely so I think depending on that
> > >> bound
> > >> > > > > > implicitly
> > >> > > > > > > > is okay.
> > >> > > > > > > >
> > >> > > > > > > > It is possible to make flush() be instead
> > >> > > > > > > >   boolean tryFlush(long timeout, TimeUnit unit);
> > >> > > > > > > >
> > >> > > > > > > > But I am somewhat skeptical that people will use this
> > >> > correctly.
> > >> > > > I.e
> > >> > > > > > > > consider the mirror maker code snippet I gave above, how
> > >> would
> > >> > one
> > >> > > > > > > actually
> > >> > > > > > > > recover in this case other than retrying (which the
> client
> > >> > already
> > >> > > > does
> > >> > > > > > > > automatically)? After all if you are okay losing data
> then
> > >> you
> > >> > > > don't
> > >> > > > > > need
> > >> > > > > > > > to bother calling flush at all, you can just let the
> > >>messages
> > >> > be
> > >> > > > sent
> > >> > > > > > > > asynchronously.
> > >> > > > > > > >
> > >> > > > > > > > I think close() is actually different because you may
> well
> > >> > want to
> > >> > > > > > > shutdown
> > >> > > > > > > > immediately and just throw away unsent events.
> > >> > > > > > > >
> > >> > > > > > > > -Jay
> > >> > > > > > > >
> > >> > > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <
> > >> > wangguoz@gmail.com>
> > >> > > > > > > wrote:
> > >> > > > > > > >
> > >> > > > > > > > > The proposal looks good to me, will need some time to
> > >> review
> > >> > the
> > >> > > > > > > > > implementation RB later.
> > >> > > > > > > > >
> > >> > > > > > > > > Bhavesh, I am wondering how you will use a flush()
> with
> > >>a
> > >> > timeout
> > >> > > > > > since
> > >> > > > > > > > > such a call does not actually provide any flushing
> > >> > guarantees?
> > >> > > > > > > > >
> > >> > > > > > > > > As for close(), there is a separate JIRA for this:
> > >> > > > > > > > >
> > >> > > > > > > > > KAFKA-1660 <
> > >> https://issues.apache.org/jira/browse/KAFKA-1660
> > >> > >
> > >> > > > > > > > >
> > >> > > > > > > > > Guozhang
> > >> > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > >> > > > > > > > mistry.p.bhavesh@gmail.com
> > >> > > > > > > > > >
> > >> > > > > > > > > wrote:
> > >> > > > > > > > >
> > >> > > > > > > > > > Hi Jay,
> > >> > > > > > > > > >
> > >> > > > > > > > > > How about adding timeout for each method calls
> > >> > > > > > > flush(timeout,TimeUnit)
> > >> > > > > > > > > and
> > >> > > > > > > > > > close(timeout,TimeUNIT) ?  We had runway io thread
> > >>issue
> > >> > and
> > >> > > > caller
> > >> > > > > > > > > thread
> > >> > > > > > > > > > should not blocked for ever for these methods ?
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > > > Thanks,
> > >> > > > > > > > > >
> > >> > > > > > > > > > Bhavesh
> > >> > > > > > > > > >
> > >> > > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <
> > >> > > > jay.kreps@gmail.com>
> > >> > > > > > > > wrote:
> > >> > > > > > > > > >
> > >> > > > > > > > > > > Well actually in the case of linger.ms = 0 the
> send
> > >>is
> > >> > still
> > >> > > > > > > > > > asynchronous
> > >> > > > > > > > > > > so calling flush() blocks until all the previously
> > >>sent
> > >> > > > records
> > >> > > > > > > have
> > >> > > > > > > > > > > completed. It doesn't speed anything up in that
> > >>case,
> > >> > though,
> > >> > > > > > since
> > >> > > > > > > > > they
> > >> > > > > > > > > > > are already available to send.
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > -Jay
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
> > >> > > > > > > gshapira@cloudera.com
> > >> > > > > > > > >
> > >> > > > > > > > > > > wrote:
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > > Looks good to me.
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > > I like the idea of not blocking additional sends
> > >>but
> > >> > not
> > >> > > > > > > > guaranteeing
> > >> > > > > > > > > > > that
> > >> > > > > > > > > > > > flush() will deliver them.
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > > I assume that with linger.ms = 0, flush will
> just
> > >> be a
> > >> > > > noop
> > >> > > > > > > (since
> > >> > > > > > > > > the
> > >> > > > > > > > > > > > queue will be empty). Is that correct?
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > > Gwen
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
> > >> > > > > > jay.kreps@gmail.com>
> > >> > > > > > > > > > wrote:
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > > > Following up on our previous thread on making
> > >>batch
> > >> > send
> > >> > > > a
> > >> > > > > > > little
> > >> > > > > > > > > > > easier,
> > >> > > > > > > > > > > > > here is a concrete proposal to add a flush()
> > >>method
> > >> > to
> > >> > > > the
> > >> > > > > > > > > producer:
> > >> > > > > > > > > > > > >
> > >> > > > > > > > > > > > >
> > >> > > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > >
> > >> >
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+met
> > >>hod+to+the+producer+API
> > >> > > > > > > > > > > > >
> > >> > > > > > > > > > > > > A proposed implementation is here:
> > >> > > > > > > > > > > > >
> > https://issues.apache.org/jira/browse/KAFKA-1865
> > >> > > > > > > > > > > > >
> > >> > > > > > > > > > > > > Thoughts?
> > >> > > > > > > > > > > > >
> > >> > > > > > > > > > > > > -Jay
> > >> > > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > > --
> > >> > > > > > > > > -- Guozhang
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > >
> > >> > > >
> > >> >
> > >> >
> > >>
> >
> >
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Guozhang Wang <wa...@gmail.com>.
In the scala clients we have the socket.timeout config as we are using
blocking IOs, when such timeout is reached the TimeoutException will be
thrown from the socket and the client can handle it accordingly; in the
java clients we are switching to non-blocking IOs and hence we will not
have the socket timeout any more.

I agree that we could add this client request timeout back in the java
clients, in addition to allowing client / server's non-blocking selector to
close idle sockets.

Guozhang

On Tue, Feb 17, 2015 at 1:55 PM, Jiangjie Qin <jq...@linkedin.com.invalid>
wrote:

> I'm thinking the flush call timeout will naturally be the timeout for a
> produce request, No?
>
> Currently it seems we don¹t have a timeout for client requests, should we
> have one?
>
> ‹Jiangjie (Becket) Qin
>
> On 2/16/15, 8:19 PM, "Jay Kreps" <ja...@gmail.com> wrote:
>
> >Yes, I think we all agree it would be good to add a client-side request
> >timeout. That would effectively imply a flush timeout as well since any
> >requests that couldn't complete in that time would be errors and hence
> >completed in the definition we gave.
> >
> >-Jay
> >
> >On Mon, Feb 16, 2015 at 7:57 PM, Bhavesh Mistry
> ><mi...@gmail.com>
> >wrote:
> >
> >> Hi All,
> >>
> >> Thanks Jay and all  address concern.  I am fine with just having flush()
> >> method as long as it covers failure mode and resiliency.  e.g We had
> >> situation where entire Kafka cluster brokers were reachable, but upon
> >> adding new kafka node and admin migrated "leader to new brokers"  that
> >>new
> >> brokers is NOT reachable from producer stand point due to fire wall but
> >> metadata would continue to elect new broker as leader for that
> >>partition.
> >>
> >> All I am asking is either you will have to give-up sending to this
> >>broker
> >> or do something in this scenario.  As for the current code 0.8.2
> >>release,
> >> caller thread of flush() or close() method would be blocked for ever....
> >> so all I am asking is
> >>
> >> https://issues.apache.org/jira/browse/KAFKA-1659
> >> https://issues.apache.org/jira/browse/KAFKA-1660
> >>
> >> Also, I recall that there is timeout also added to batch to indicate how
> >> long "message" can retain in memory before expiring.
> >>
> >> Given,  all this should this API be consistent with others up coming
> >> patches for addressing similar problem(s).
> >>
> >>
> >> Otherwise, what we have done is spawn a thread for just calling close()
> >>or
> >> flush with timeout for join on caller end.
> >>
> >> Anyway, I just wanted to give you issues with existing API and if you
> >>guys
> >> think this is fine then, I am ok with this approach. It is just that
> >>caller
> >> will have to do bit more work.
> >>
> >>
> >> Thanks,
> >>
> >> Bhavesh
> >>
> >> On Thursday, February 12, 2015, Joel Koshy <jj...@gmail.com> wrote:
> >>
> >> > Yes that is a counter-example. I'm okay either way on whether we
> >> > should have just flush() or have a timeout. Bhavesh, does Jay's
> >> > explanation a few replies prior address your concern? If so, shall we
> >> > consider this closed?
> >> >
> >> > On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote:
> >> > > Yeah we could do that, I guess I just feel like it adds confusion
> >> because
> >> > > then you have to think about which timeout you want, when likely you
> >> > don't
> >> > > want a timeout at all.
> >> > >
> >> > > I guess the pattern I was thinking of was fflush or the java
> >> equivalent,
> >> > > which don't have timeouts:
> >> > >
> >> >
> >>
> >>
> http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush(
> >>)
> >> > >
> >> > > -Jay
> >> > >
> >> > > On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jj...@gmail.com>
> >> > wrote:
> >> > >
> >> > > > I think tryFlush with a timeout sounds good to me. This is really
> >> more
> >> > > > for consistency than anything else. I cannot think of any standard
> >> > > > blocking calls off the top of my head that don't have a timed
> >> variant.
> >> > > > E.g., Thread.join, Object.wait, Future.get Either that, or they
> >> > > > provide an entirely non-blocking mode (e.g., socketChannel.connect
> >> > > > followed by finishConnect)
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > > Joel
> >> > > >
> >> > > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
> >> > > > > Jay,
> >> > > > >
> >> > > > > The .flush() call seems like it would be the best way if you
> >>wanted
> >> > > > to-do a
> >> > > > > clean shutdown of the new producer?
> >> > > > >
> >> > > > > So, you could in your code "stop all incoming requests &&
> >> > > > producer.flush()
> >> > > > > && system.exit(value)" and know pretty much you won't drop
> >>anything
> >> > on
> >> > > > the
> >> > > > > floor.
> >> > > > >
> >> > > > > This can be done with the callbacks and futures (sure) but
> >>.flush()
> >> > seems
> >> > > > > to be the right time to block and a few lines of code, no?
> >> > > > >
> >> > > > > ~ Joestein
> >> > > > >
> >> > > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps
> >><ja...@gmail.com>
> >> > wrote:
> >> > > > >
> >> > > > > > Hey Bhavesh,
> >> > > > > >
> >> > > > > > If a broker is not available a new one should be elected to
> >>take
> >> > over,
> >> > > > so
> >> > > > > > although the flush might take longer it should still be quick.
> >> > Even if
> >> > > > not
> >> > > > > > this should result in an error not a hang.
> >> > > > > >
> >> > > > > > The cases you enumerated are all covered already--if the user
> >> > wants to
> >> > > > > > retry that is covered by the retry setting in the client, for
> >>all
> >> > the
> >> > > > > > errors that is considered completion of the request. The post
> >> > > > condition of
> >> > > > > > flush isn't that all sends complete successfully, just that
> >>they
> >> > > > complete.
> >> > > > > > So if you try to send a message that is too big, when flush
> >> returns
> >> > > > calling
> >> > > > > > .get() on the future should not block and should produce the
> >> error.
> >> > > > > >
> >> > > > > > Basically the argument I am making is that the only reason you
> >> > want to
> >> > > > call
> >> > > > > > flush() is to guarantee all the sends complete so if it
> >>doesn't
> >> > > > guarantee
> >> > > > > > that it will be somewhat confusing. This does mean blocking,
> >>but
> >> > if you
> >> > > > > > don't want to block on the send then you wouldn't call
> >>flush().
> >> > > > > >
> >> > > > > > This has no impact on the block.on.buffer full setting. That
> >> > impacts
> >> > > > what
> >> > > > > > happens when send() can't append to the buffer because it is
> >> full.
> >> > > > flush()
> >> > > > > > means any message previously sent (i.e. for which send() call
> >>has
> >> > > > returned)
> >> > > > > > needs to have its request completed. Hope that makes sense.
> >> > > > > >
> >> > > > > > -Jay
> >> > > > > >
> >> > > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
> >> > > > > > mistry.p.bhavesh@gmail.com>
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > HI Jay,
> >> > > > > > >
> >> > > > > > > Imagine, if you have flaky network connection to brokers,
> >>and
> >> if
> >> > > > flush()
> >> > > > > > > will be blocked if "one of broker is not available" (
> >>basically
> >> > How
> >> > > > would
> >> > > > > > > be address failure mode and io thread not able to drain
> >>records
> >> > or
> >> > > > busy
> >> > > > > > due
> >> > > > > > > to pending request". Do you flush() method is only to flush
> >>to
> >> > in mem
> >> > > > > > queue
> >> > > > > > > or flush to broker over the network().
> >> > > > > > >
> >> > > > > > > Timeout helps with and pushing caller to handle what to do
> >>?
> >> > e.g
> >> > > > > > > re-enqueue records, drop entire batch or one of message is
> >>too
> >> > big
> >> > > > cross
> >> > > > > > > the limit of max.message.size etc...
> >> > > > > > >
> >> > > > > > > Also, according to java doc for API  "The method will block
> >> > until all
> >> > > > > > > previously sent records have completed sending (either
> >> > successfully
> >> > > > or
> >> > > > > > with
> >> > > > > > > an error)", does this by-pass rule set by for
> >> > block.on.buffer.full or
> >> > > > > > > batch.size
> >> > > > > > > when under load.
> >> > > > > > >
> >> > > > > > > That was my intention, and I am sorry I mixed-up close()
> >>method
> >> > here
> >> > > > > > > without knowing that this is only for bulk send.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > >
> >> > > > > > > Bhavesh
> >> > > > > > >
> >> > > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps
> >><jay.kreps@gmail.com
> >> >
> >> > > > wrote:
> >> > > > > > >
> >> > > > > > > > Yeah I second the problem Guozhang flags with giving
> >>flush a
> >> > > > timeout.
> >> > > > > > In
> >> > > > > > > > general failover in Kafka is a bounded thing unless you
> >>have
> >> > > > brought
> >> > > > > > your
> >> > > > > > > > Kafka cluster down entirely so I think depending on that
> >> bound
> >> > > > > > implicitly
> >> > > > > > > > is okay.
> >> > > > > > > >
> >> > > > > > > > It is possible to make flush() be instead
> >> > > > > > > >   boolean tryFlush(long timeout, TimeUnit unit);
> >> > > > > > > >
> >> > > > > > > > But I am somewhat skeptical that people will use this
> >> > correctly.
> >> > > > I.e
> >> > > > > > > > consider the mirror maker code snippet I gave above, how
> >> would
> >> > one
> >> > > > > > > actually
> >> > > > > > > > recover in this case other than retrying (which the client
> >> > already
> >> > > > does
> >> > > > > > > > automatically)? After all if you are okay losing data then
> >> you
> >> > > > don't
> >> > > > > > need
> >> > > > > > > > to bother calling flush at all, you can just let the
> >>messages
> >> > be
> >> > > > sent
> >> > > > > > > > asynchronously.
> >> > > > > > > >
> >> > > > > > > > I think close() is actually different because you may well
> >> > want to
> >> > > > > > > shutdown
> >> > > > > > > > immediately and just throw away unsent events.
> >> > > > > > > >
> >> > > > > > > > -Jay
> >> > > > > > > >
> >> > > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <
> >> > wangguoz@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > The proposal looks good to me, will need some time to
> >> review
> >> > the
> >> > > > > > > > > implementation RB later.
> >> > > > > > > > >
> >> > > > > > > > > Bhavesh, I am wondering how you will use a flush() with
> >>a
> >> > timeout
> >> > > > > > since
> >> > > > > > > > > such a call does not actually provide any flushing
> >> > guarantees?
> >> > > > > > > > >
> >> > > > > > > > > As for close(), there is a separate JIRA for this:
> >> > > > > > > > >
> >> > > > > > > > > KAFKA-1660 <
> >> https://issues.apache.org/jira/browse/KAFKA-1660
> >> > >
> >> > > > > > > > >
> >> > > > > > > > > Guozhang
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> >> > > > > > > > mistry.p.bhavesh@gmail.com
> >> > > > > > > > > >
> >> > > > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > Hi Jay,
> >> > > > > > > > > >
> >> > > > > > > > > > How about adding timeout for each method calls
> >> > > > > > > flush(timeout,TimeUnit)
> >> > > > > > > > > and
> >> > > > > > > > > > close(timeout,TimeUNIT) ?  We had runway io thread
> >>issue
> >> > and
> >> > > > caller
> >> > > > > > > > > thread
> >> > > > > > > > > > should not blocked for ever for these methods ?
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > Thanks,
> >> > > > > > > > > >
> >> > > > > > > > > > Bhavesh
> >> > > > > > > > > >
> >> > > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <
> >> > > > jay.kreps@gmail.com>
> >> > > > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > > Well actually in the case of linger.ms = 0 the send
> >>is
> >> > still
> >> > > > > > > > > > asynchronous
> >> > > > > > > > > > > so calling flush() blocks until all the previously
> >>sent
> >> > > > records
> >> > > > > > > have
> >> > > > > > > > > > > completed. It doesn't speed anything up in that
> >>case,
> >> > though,
> >> > > > > > since
> >> > > > > > > > > they
> >> > > > > > > > > > > are already available to send.
> >> > > > > > > > > > >
> >> > > > > > > > > > > -Jay
> >> > > > > > > > > > >
> >> > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
> >> > > > > > > gshapira@cloudera.com
> >> > > > > > > > >
> >> > > > > > > > > > > wrote:
> >> > > > > > > > > > >
> >> > > > > > > > > > > > Looks good to me.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > I like the idea of not blocking additional sends
> >>but
> >> > not
> >> > > > > > > > guaranteeing
> >> > > > > > > > > > > that
> >> > > > > > > > > > > > flush() will deliver them.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > I assume that with linger.ms = 0, flush will just
> >> be a
> >> > > > noop
> >> > > > > > > (since
> >> > > > > > > > > the
> >> > > > > > > > > > > > queue will be empty). Is that correct?
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Gwen
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
> >> > > > > > jay.kreps@gmail.com>
> >> > > > > > > > > > wrote:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > > Following up on our previous thread on making
> >>batch
> >> > send
> >> > > > a
> >> > > > > > > little
> >> > > > > > > > > > > easier,
> >> > > > > > > > > > > > > here is a concrete proposal to add a flush()
> >>method
> >> > to
> >> > > > the
> >> > > > > > > > > producer:
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > >
> >> >
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+met
> >>hod+to+the+producer+API
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > A proposed implementation is here:
> >> > > > > > > > > > > > >
> https://issues.apache.org/jira/browse/KAFKA-1865
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Thoughts?
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > -Jay
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > --
> >> > > > > > > > > -- Guozhang
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > >
> >> > > >
> >> >
> >> >
> >>
>
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
I'm thinking the flush call timeout will naturally be the timeout for a
produce request, No?

Currently it seems we don¹t have a timeout for client requests, should we
have one?

‹Jiangjie (Becket) Qin

On 2/16/15, 8:19 PM, "Jay Kreps" <ja...@gmail.com> wrote:

>Yes, I think we all agree it would be good to add a client-side request
>timeout. That would effectively imply a flush timeout as well since any
>requests that couldn't complete in that time would be errors and hence
>completed in the definition we gave.
>
>-Jay
>
>On Mon, Feb 16, 2015 at 7:57 PM, Bhavesh Mistry
><mi...@gmail.com>
>wrote:
>
>> Hi All,
>>
>> Thanks Jay and all  address concern.  I am fine with just having flush()
>> method as long as it covers failure mode and resiliency.  e.g We had
>> situation where entire Kafka cluster brokers were reachable, but upon
>> adding new kafka node and admin migrated "leader to new brokers"  that
>>new
>> brokers is NOT reachable from producer stand point due to fire wall but
>> metadata would continue to elect new broker as leader for that
>>partition.
>>
>> All I am asking is either you will have to give-up sending to this
>>broker
>> or do something in this scenario.  As for the current code 0.8.2
>>release,
>> caller thread of flush() or close() method would be blocked for ever....
>> so all I am asking is
>>
>> https://issues.apache.org/jira/browse/KAFKA-1659
>> https://issues.apache.org/jira/browse/KAFKA-1660
>>
>> Also, I recall that there is timeout also added to batch to indicate how
>> long "message" can retain in memory before expiring.
>>
>> Given,  all this should this API be consistent with others up coming
>> patches for addressing similar problem(s).
>>
>>
>> Otherwise, what we have done is spawn a thread for just calling close()
>>or
>> flush with timeout for join on caller end.
>>
>> Anyway, I just wanted to give you issues with existing API and if you
>>guys
>> think this is fine then, I am ok with this approach. It is just that
>>caller
>> will have to do bit more work.
>>
>>
>> Thanks,
>>
>> Bhavesh
>>
>> On Thursday, February 12, 2015, Joel Koshy <jj...@gmail.com> wrote:
>>
>> > Yes that is a counter-example. I'm okay either way on whether we
>> > should have just flush() or have a timeout. Bhavesh, does Jay's
>> > explanation a few replies prior address your concern? If so, shall we
>> > consider this closed?
>> >
>> > On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote:
>> > > Yeah we could do that, I guess I just feel like it adds confusion
>> because
>> > > then you have to think about which timeout you want, when likely you
>> > don't
>> > > want a timeout at all.
>> > >
>> > > I guess the pattern I was thinking of was fflush or the java
>> equivalent,
>> > > which don't have timeouts:
>> > >
>> >
>> 
>>http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush(
>>)
>> > >
>> > > -Jay
>> > >
>> > > On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jj...@gmail.com>
>> > wrote:
>> > >
>> > > > I think tryFlush with a timeout sounds good to me. This is really
>> more
>> > > > for consistency than anything else. I cannot think of any standard
>> > > > blocking calls off the top of my head that don't have a timed
>> variant.
>> > > > E.g., Thread.join, Object.wait, Future.get Either that, or they
>> > > > provide an entirely non-blocking mode (e.g., socketChannel.connect
>> > > > followed by finishConnect)
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Joel
>> > > >
>> > > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
>> > > > > Jay,
>> > > > >
>> > > > > The .flush() call seems like it would be the best way if you
>>wanted
>> > > > to-do a
>> > > > > clean shutdown of the new producer?
>> > > > >
>> > > > > So, you could in your code "stop all incoming requests &&
>> > > > producer.flush()
>> > > > > && system.exit(value)" and know pretty much you won't drop
>>anything
>> > on
>> > > > the
>> > > > > floor.
>> > > > >
>> > > > > This can be done with the callbacks and futures (sure) but
>>.flush()
>> > seems
>> > > > > to be the right time to block and a few lines of code, no?
>> > > > >
>> > > > > ~ Joestein
>> > > > >
>> > > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps
>><ja...@gmail.com>
>> > wrote:
>> > > > >
>> > > > > > Hey Bhavesh,
>> > > > > >
>> > > > > > If a broker is not available a new one should be elected to
>>take
>> > over,
>> > > > so
>> > > > > > although the flush might take longer it should still be quick.
>> > Even if
>> > > > not
>> > > > > > this should result in an error not a hang.
>> > > > > >
>> > > > > > The cases you enumerated are all covered already--if the user
>> > wants to
>> > > > > > retry that is covered by the retry setting in the client, for
>>all
>> > the
>> > > > > > errors that is considered completion of the request. The post
>> > > > condition of
>> > > > > > flush isn't that all sends complete successfully, just that
>>they
>> > > > complete.
>> > > > > > So if you try to send a message that is too big, when flush
>> returns
>> > > > calling
>> > > > > > .get() on the future should not block and should produce the
>> error.
>> > > > > >
>> > > > > > Basically the argument I am making is that the only reason you
>> > want to
>> > > > call
>> > > > > > flush() is to guarantee all the sends complete so if it
>>doesn't
>> > > > guarantee
>> > > > > > that it will be somewhat confusing. This does mean blocking,
>>but
>> > if you
>> > > > > > don't want to block on the send then you wouldn't call
>>flush().
>> > > > > >
>> > > > > > This has no impact on the block.on.buffer full setting. That
>> > impacts
>> > > > what
>> > > > > > happens when send() can't append to the buffer because it is
>> full.
>> > > > flush()
>> > > > > > means any message previously sent (i.e. for which send() call
>>has
>> > > > returned)
>> > > > > > needs to have its request completed. Hope that makes sense.
>> > > > > >
>> > > > > > -Jay
>> > > > > >
>> > > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
>> > > > > > mistry.p.bhavesh@gmail.com>
>> > > > > > wrote:
>> > > > > >
>> > > > > > > HI Jay,
>> > > > > > >
>> > > > > > > Imagine, if you have flaky network connection to brokers,
>>and
>> if
>> > > > flush()
>> > > > > > > will be blocked if "one of broker is not available" (
>>basically
>> > How
>> > > > would
>> > > > > > > be address failure mode and io thread not able to drain
>>records
>> > or
>> > > > busy
>> > > > > > due
>> > > > > > > to pending request". Do you flush() method is only to flush
>>to
>> > in mem
>> > > > > > queue
>> > > > > > > or flush to broker over the network().
>> > > > > > >
>> > > > > > > Timeout helps with and pushing caller to handle what to do
>>?
>> > e.g
>> > > > > > > re-enqueue records, drop entire batch or one of message is
>>too
>> > big
>> > > > cross
>> > > > > > > the limit of max.message.size etc...
>> > > > > > >
>> > > > > > > Also, according to java doc for API  "The method will block
>> > until all
>> > > > > > > previously sent records have completed sending (either
>> > successfully
>> > > > or
>> > > > > > with
>> > > > > > > an error)", does this by-pass rule set by for
>> > block.on.buffer.full or
>> > > > > > > batch.size
>> > > > > > > when under load.
>> > > > > > >
>> > > > > > > That was my intention, and I am sorry I mixed-up close()
>>method
>> > here
>> > > > > > > without knowing that this is only for bulk send.
>> > > > > > >
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > >
>> > > > > > > Bhavesh
>> > > > > > >
>> > > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps
>><jay.kreps@gmail.com
>> >
>> > > > wrote:
>> > > > > > >
>> > > > > > > > Yeah I second the problem Guozhang flags with giving
>>flush a
>> > > > timeout.
>> > > > > > In
>> > > > > > > > general failover in Kafka is a bounded thing unless you
>>have
>> > > > brought
>> > > > > > your
>> > > > > > > > Kafka cluster down entirely so I think depending on that
>> bound
>> > > > > > implicitly
>> > > > > > > > is okay.
>> > > > > > > >
>> > > > > > > > It is possible to make flush() be instead
>> > > > > > > >   boolean tryFlush(long timeout, TimeUnit unit);
>> > > > > > > >
>> > > > > > > > But I am somewhat skeptical that people will use this
>> > correctly.
>> > > > I.e
>> > > > > > > > consider the mirror maker code snippet I gave above, how
>> would
>> > one
>> > > > > > > actually
>> > > > > > > > recover in this case other than retrying (which the client
>> > already
>> > > > does
>> > > > > > > > automatically)? After all if you are okay losing data then
>> you
>> > > > don't
>> > > > > > need
>> > > > > > > > to bother calling flush at all, you can just let the
>>messages
>> > be
>> > > > sent
>> > > > > > > > asynchronously.
>> > > > > > > >
>> > > > > > > > I think close() is actually different because you may well
>> > want to
>> > > > > > > shutdown
>> > > > > > > > immediately and just throw away unsent events.
>> > > > > > > >
>> > > > > > > > -Jay
>> > > > > > > >
>> > > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <
>> > wangguoz@gmail.com>
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > The proposal looks good to me, will need some time to
>> review
>> > the
>> > > > > > > > > implementation RB later.
>> > > > > > > > >
>> > > > > > > > > Bhavesh, I am wondering how you will use a flush() with
>>a
>> > timeout
>> > > > > > since
>> > > > > > > > > such a call does not actually provide any flushing
>> > guarantees?
>> > > > > > > > >
>> > > > > > > > > As for close(), there is a separate JIRA for this:
>> > > > > > > > >
>> > > > > > > > > KAFKA-1660 <
>> https://issues.apache.org/jira/browse/KAFKA-1660
>> > >
>> > > > > > > > >
>> > > > > > > > > Guozhang
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
>> > > > > > > > mistry.p.bhavesh@gmail.com
>> > > > > > > > > >
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi Jay,
>> > > > > > > > > >
>> > > > > > > > > > How about adding timeout for each method calls
>> > > > > > > flush(timeout,TimeUnit)
>> > > > > > > > > and
>> > > > > > > > > > close(timeout,TimeUNIT) ?  We had runway io thread
>>issue
>> > and
>> > > > caller
>> > > > > > > > > thread
>> > > > > > > > > > should not blocked for ever for these methods ?
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Thanks,
>> > > > > > > > > >
>> > > > > > > > > > Bhavesh
>> > > > > > > > > >
>> > > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <
>> > > > jay.kreps@gmail.com>
>> > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Well actually in the case of linger.ms = 0 the send
>>is
>> > still
>> > > > > > > > > > asynchronous
>> > > > > > > > > > > so calling flush() blocks until all the previously
>>sent
>> > > > records
>> > > > > > > have
>> > > > > > > > > > > completed. It doesn't speed anything up in that
>>case,
>> > though,
>> > > > > > since
>> > > > > > > > > they
>> > > > > > > > > > > are already available to send.
>> > > > > > > > > > >
>> > > > > > > > > > > -Jay
>> > > > > > > > > > >
>> > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
>> > > > > > > gshapira@cloudera.com
>> > > > > > > > >
>> > > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Looks good to me.
>> > > > > > > > > > > >
>> > > > > > > > > > > > I like the idea of not blocking additional sends
>>but
>> > not
>> > > > > > > > guaranteeing
>> > > > > > > > > > > that
>> > > > > > > > > > > > flush() will deliver them.
>> > > > > > > > > > > >
>> > > > > > > > > > > > I assume that with linger.ms = 0, flush will just
>> be a
>> > > > noop
>> > > > > > > (since
>> > > > > > > > > the
>> > > > > > > > > > > > queue will be empty). Is that correct?
>> > > > > > > > > > > >
>> > > > > > > > > > > > Gwen
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
>> > > > > > jay.kreps@gmail.com>
>> > > > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Following up on our previous thread on making
>>batch
>> > send
>> > > > a
>> > > > > > > little
>> > > > > > > > > > > easier,
>> > > > > > > > > > > > > here is a concrete proposal to add a flush()
>>method
>> > to
>> > > > the
>> > > > > > > > > producer:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > >
>> >
>> 
>>https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+met
>>hod+to+the+producer+API
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > A proposed implementation is here:
>> > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thoughts?
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > -Jay
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > --
>> > > > > > > > > -- Guozhang
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > >
>> > > >
>> >
>> >
>>


Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Jay Kreps <ja...@gmail.com>.
Yes, I think we all agree it would be good to add a client-side request
timeout. That would effectively imply a flush timeout as well since any
requests that couldn't complete in that time would be errors and hence
completed in the definition we gave.

-Jay

On Mon, Feb 16, 2015 at 7:57 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> Hi All,
>
> Thanks Jay and all  address concern.  I am fine with just having flush()
> method as long as it covers failure mode and resiliency.  e.g We had
> situation where entire Kafka cluster brokers were reachable, but upon
> adding new kafka node and admin migrated "leader to new brokers"  that new
> brokers is NOT reachable from producer stand point due to fire wall but
> metadata would continue to elect new broker as leader for that partition.
>
> All I am asking is either you will have to give-up sending to this broker
> or do something in this scenario.  As for the current code 0.8.2 release,
> caller thread of flush() or close() method would be blocked for ever....
> so all I am asking is
>
> https://issues.apache.org/jira/browse/KAFKA-1659
> https://issues.apache.org/jira/browse/KAFKA-1660
>
> Also, I recall that there is timeout also added to batch to indicate how
> long "message" can retain in memory before expiring.
>
> Given,  all this should this API be consistent with others up coming
> patches for addressing similar problem(s).
>
>
> Otherwise, what we have done is spawn a thread for just calling close() or
> flush with timeout for join on caller end.
>
> Anyway, I just wanted to give you issues with existing API and if you guys
> think this is fine then, I am ok with this approach. It is just that caller
> will have to do bit more work.
>
>
> Thanks,
>
> Bhavesh
>
> On Thursday, February 12, 2015, Joel Koshy <jj...@gmail.com> wrote:
>
> > Yes that is a counter-example. I'm okay either way on whether we
> > should have just flush() or have a timeout. Bhavesh, does Jay's
> > explanation a few replies prior address your concern? If so, shall we
> > consider this closed?
> >
> > On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote:
> > > Yeah we could do that, I guess I just feel like it adds confusion
> because
> > > then you have to think about which timeout you want, when likely you
> > don't
> > > want a timeout at all.
> > >
> > > I guess the pattern I was thinking of was fflush or the java
> equivalent,
> > > which don't have timeouts:
> > >
> >
> http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush()
> > >
> > > -Jay
> > >
> > > On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jj...@gmail.com>
> > wrote:
> > >
> > > > I think tryFlush with a timeout sounds good to me. This is really
> more
> > > > for consistency than anything else. I cannot think of any standard
> > > > blocking calls off the top of my head that don't have a timed
> variant.
> > > > E.g., Thread.join, Object.wait, Future.get Either that, or they
> > > > provide an entirely non-blocking mode (e.g., socketChannel.connect
> > > > followed by finishConnect)
> > > >
> > > > Thanks,
> > > >
> > > > Joel
> > > >
> > > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
> > > > > Jay,
> > > > >
> > > > > The .flush() call seems like it would be the best way if you wanted
> > > > to-do a
> > > > > clean shutdown of the new producer?
> > > > >
> > > > > So, you could in your code "stop all incoming requests &&
> > > > producer.flush()
> > > > > && system.exit(value)" and know pretty much you won't drop anything
> > on
> > > > the
> > > > > floor.
> > > > >
> > > > > This can be done with the callbacks and futures (sure) but .flush()
> > seems
> > > > > to be the right time to block and a few lines of code, no?
> > > > >
> > > > > ~ Joestein
> > > > >
> > > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hey Bhavesh,
> > > > > >
> > > > > > If a broker is not available a new one should be elected to take
> > over,
> > > > so
> > > > > > although the flush might take longer it should still be quick.
> > Even if
> > > > not
> > > > > > this should result in an error not a hang.
> > > > > >
> > > > > > The cases you enumerated are all covered already--if the user
> > wants to
> > > > > > retry that is covered by the retry setting in the client, for all
> > the
> > > > > > errors that is considered completion of the request. The post
> > > > condition of
> > > > > > flush isn't that all sends complete successfully, just that they
> > > > complete.
> > > > > > So if you try to send a message that is too big, when flush
> returns
> > > > calling
> > > > > > .get() on the future should not block and should produce the
> error.
> > > > > >
> > > > > > Basically the argument I am making is that the only reason you
> > want to
> > > > call
> > > > > > flush() is to guarantee all the sends complete so if it doesn't
> > > > guarantee
> > > > > > that it will be somewhat confusing. This does mean blocking, but
> > if you
> > > > > > don't want to block on the send then you wouldn't call flush().
> > > > > >
> > > > > > This has no impact on the block.on.buffer full setting. That
> > impacts
> > > > what
> > > > > > happens when send() can't append to the buffer because it is
> full.
> > > > flush()
> > > > > > means any message previously sent (i.e. for which send() call has
> > > > returned)
> > > > > > needs to have its request completed. Hope that makes sense.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
> > > > > > mistry.p.bhavesh@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > HI Jay,
> > > > > > >
> > > > > > > Imagine, if you have flaky network connection to brokers, and
> if
> > > > flush()
> > > > > > > will be blocked if "one of broker is not available" ( basically
> > How
> > > > would
> > > > > > > be address failure mode and io thread not able to drain records
> > or
> > > > busy
> > > > > > due
> > > > > > > to pending request". Do you flush() method is only to flush to
> > in mem
> > > > > > queue
> > > > > > > or flush to broker over the network().
> > > > > > >
> > > > > > > Timeout helps with and pushing caller to handle what to do  ?
> > e.g
> > > > > > > re-enqueue records, drop entire batch or one of message is too
> > big
> > > > cross
> > > > > > > the limit of max.message.size etc...
> > > > > > >
> > > > > > > Also, according to java doc for API  "The method will block
> > until all
> > > > > > > previously sent records have completed sending (either
> > successfully
> > > > or
> > > > > > with
> > > > > > > an error)", does this by-pass rule set by for
> > block.on.buffer.full or
> > > > > > > batch.size
> > > > > > > when under load.
> > > > > > >
> > > > > > > That was my intention, and I am sorry I mixed-up close() method
> > here
> > > > > > > without knowing that this is only for bulk send.
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Bhavesh
> > > > > > >
> > > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps <jay.kreps@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Yeah I second the problem Guozhang flags with giving flush a
> > > > timeout.
> > > > > > In
> > > > > > > > general failover in Kafka is a bounded thing unless you have
> > > > brought
> > > > > > your
> > > > > > > > Kafka cluster down entirely so I think depending on that
> bound
> > > > > > implicitly
> > > > > > > > is okay.
> > > > > > > >
> > > > > > > > It is possible to make flush() be instead
> > > > > > > >   boolean tryFlush(long timeout, TimeUnit unit);
> > > > > > > >
> > > > > > > > But I am somewhat skeptical that people will use this
> > correctly.
> > > > I.e
> > > > > > > > consider the mirror maker code snippet I gave above, how
> would
> > one
> > > > > > > actually
> > > > > > > > recover in this case other than retrying (which the client
> > already
> > > > does
> > > > > > > > automatically)? After all if you are okay losing data then
> you
> > > > don't
> > > > > > need
> > > > > > > > to bother calling flush at all, you can just let the messages
> > be
> > > > sent
> > > > > > > > asynchronously.
> > > > > > > >
> > > > > > > > I think close() is actually different because you may well
> > want to
> > > > > > > shutdown
> > > > > > > > immediately and just throw away unsent events.
> > > > > > > >
> > > > > > > > -Jay
> > > > > > > >
> > > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <
> > wangguoz@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > The proposal looks good to me, will need some time to
> review
> > the
> > > > > > > > > implementation RB later.
> > > > > > > > >
> > > > > > > > > Bhavesh, I am wondering how you will use a flush() with a
> > timeout
> > > > > > since
> > > > > > > > > such a call does not actually provide any flushing
> > guarantees?
> > > > > > > > >
> > > > > > > > > As for close(), there is a separate JIRA for this:
> > > > > > > > >
> > > > > > > > > KAFKA-1660 <
> https://issues.apache.org/jira/browse/KAFKA-1660
> > >
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > > > > > > > mistry.p.bhavesh@gmail.com
> > > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Jay,
> > > > > > > > > >
> > > > > > > > > > How about adding timeout for each method calls
> > > > > > > flush(timeout,TimeUnit)
> > > > > > > > > and
> > > > > > > > > > close(timeout,TimeUNIT) ?  We had runway io thread issue
> > and
> > > > caller
> > > > > > > > > thread
> > > > > > > > > > should not blocked for ever for these methods ?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Bhavesh
> > > > > > > > > >
> > > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <
> > > > jay.kreps@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Well actually in the case of linger.ms = 0 the send is
> > still
> > > > > > > > > > asynchronous
> > > > > > > > > > > so calling flush() blocks until all the previously sent
> > > > records
> > > > > > > have
> > > > > > > > > > > completed. It doesn't speed anything up in that case,
> > though,
> > > > > > since
> > > > > > > > > they
> > > > > > > > > > > are already available to send.
> > > > > > > > > > >
> > > > > > > > > > > -Jay
> > > > > > > > > > >
> > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
> > > > > > > gshapira@cloudera.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Looks good to me.
> > > > > > > > > > > >
> > > > > > > > > > > > I like the idea of not blocking additional sends but
> > not
> > > > > > > > guaranteeing
> > > > > > > > > > > that
> > > > > > > > > > > > flush() will deliver them.
> > > > > > > > > > > >
> > > > > > > > > > > > I assume that with linger.ms = 0, flush will just
> be a
> > > > noop
> > > > > > > (since
> > > > > > > > > the
> > > > > > > > > > > > queue will be empty). Is that correct?
> > > > > > > > > > > >
> > > > > > > > > > > > Gwen
> > > > > > > > > > > >
> > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
> > > > > > jay.kreps@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Following up on our previous thread on making batch
> > send
> > > > a
> > > > > > > little
> > > > > > > > > > > easier,
> > > > > > > > > > > > > here is a concrete proposal to add a flush() method
> > to
> > > > the
> > > > > > > > > producer:
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > > > > > > > > > > >
> > > > > > > > > > > > > A proposed implementation is here:
> > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thoughts?
> > > > > > > > > > > > >
> > > > > > > > > > > > > -Jay
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > > >
> >
> >
>

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi All,

Thanks Jay and all  address concern.  I am fine with just having flush()
method as long as it covers failure mode and resiliency.  e.g We had
situation where entire Kafka cluster brokers were reachable, but upon
adding new kafka node and admin migrated "leader to new brokers"  that new
brokers is NOT reachable from producer stand point due to fire wall but
metadata would continue to elect new broker as leader for that partition.

All I am asking is either you will have to give-up sending to this broker
or do something in this scenario.  As for the current code 0.8.2 release,
caller thread of flush() or close() method would be blocked for ever....
so all I am asking is

https://issues.apache.org/jira/browse/KAFKA-1659
https://issues.apache.org/jira/browse/KAFKA-1660

Also, I recall that there is timeout also added to batch to indicate how
long "message" can retain in memory before expiring.

Given,  all this should this API be consistent with others up coming
patches for addressing similar problem(s).


Otherwise, what we have done is spawn a thread for just calling close() or
flush with timeout for join on caller end.

Anyway, I just wanted to give you issues with existing API and if you guys
think this is fine then, I am ok with this approach. It is just that caller
will have to do bit more work.


Thanks,

Bhavesh

On Thursday, February 12, 2015, Joel Koshy <jj...@gmail.com> wrote:

> Yes that is a counter-example. I'm okay either way on whether we
> should have just flush() or have a timeout. Bhavesh, does Jay's
> explanation a few replies prior address your concern? If so, shall we
> consider this closed?
>
> On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote:
> > Yeah we could do that, I guess I just feel like it adds confusion because
> > then you have to think about which timeout you want, when likely you
> don't
> > want a timeout at all.
> >
> > I guess the pattern I was thinking of was fflush or the java equivalent,
> > which don't have timeouts:
> >
> http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush()
> >
> > -Jay
> >
> > On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jj...@gmail.com>
> wrote:
> >
> > > I think tryFlush with a timeout sounds good to me. This is really more
> > > for consistency than anything else. I cannot think of any standard
> > > blocking calls off the top of my head that don't have a timed variant.
> > > E.g., Thread.join, Object.wait, Future.get Either that, or they
> > > provide an entirely non-blocking mode (e.g., socketChannel.connect
> > > followed by finishConnect)
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
> > > > Jay,
> > > >
> > > > The .flush() call seems like it would be the best way if you wanted
> > > to-do a
> > > > clean shutdown of the new producer?
> > > >
> > > > So, you could in your code "stop all incoming requests &&
> > > producer.flush()
> > > > && system.exit(value)" and know pretty much you won't drop anything
> on
> > > the
> > > > floor.
> > > >
> > > > This can be done with the callbacks and futures (sure) but .flush()
> seems
> > > > to be the right time to block and a few lines of code, no?
> > > >
> > > > ~ Joestein
> > > >
> > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps <ja...@gmail.com>
> wrote:
> > > >
> > > > > Hey Bhavesh,
> > > > >
> > > > > If a broker is not available a new one should be elected to take
> over,
> > > so
> > > > > although the flush might take longer it should still be quick.
> Even if
> > > not
> > > > > this should result in an error not a hang.
> > > > >
> > > > > The cases you enumerated are all covered already--if the user
> wants to
> > > > > retry that is covered by the retry setting in the client, for all
> the
> > > > > errors that is considered completion of the request. The post
> > > condition of
> > > > > flush isn't that all sends complete successfully, just that they
> > > complete.
> > > > > So if you try to send a message that is too big, when flush returns
> > > calling
> > > > > .get() on the future should not block and should produce the error.
> > > > >
> > > > > Basically the argument I am making is that the only reason you
> want to
> > > call
> > > > > flush() is to guarantee all the sends complete so if it doesn't
> > > guarantee
> > > > > that it will be somewhat confusing. This does mean blocking, but
> if you
> > > > > don't want to block on the send then you wouldn't call flush().
> > > > >
> > > > > This has no impact on the block.on.buffer full setting. That
> impacts
> > > what
> > > > > happens when send() can't append to the buffer because it is full.
> > > flush()
> > > > > means any message previously sent (i.e. for which send() call has
> > > returned)
> > > > > needs to have its request completed. Hope that makes sense.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
> > > > > mistry.p.bhavesh@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > HI Jay,
> > > > > >
> > > > > > Imagine, if you have flaky network connection to brokers, and if
> > > flush()
> > > > > > will be blocked if "one of broker is not available" ( basically
> How
> > > would
> > > > > > be address failure mode and io thread not able to drain records
> or
> > > busy
> > > > > due
> > > > > > to pending request". Do you flush() method is only to flush to
> in mem
> > > > > queue
> > > > > > or flush to broker over the network().
> > > > > >
> > > > > > Timeout helps with and pushing caller to handle what to do  ?
> e.g
> > > > > > re-enqueue records, drop entire batch or one of message is too
> big
> > > cross
> > > > > > the limit of max.message.size etc...
> > > > > >
> > > > > > Also, according to java doc for API  "The method will block
> until all
> > > > > > previously sent records have completed sending (either
> successfully
> > > or
> > > > > with
> > > > > > an error)", does this by-pass rule set by for
> block.on.buffer.full or
> > > > > > batch.size
> > > > > > when under load.
> > > > > >
> > > > > > That was my intention, and I am sorry I mixed-up close() method
> here
> > > > > > without knowing that this is only for bulk send.
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bhavesh
> > > > > >
> > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps <ja...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Yeah I second the problem Guozhang flags with giving flush a
> > > timeout.
> > > > > In
> > > > > > > general failover in Kafka is a bounded thing unless you have
> > > brought
> > > > > your
> > > > > > > Kafka cluster down entirely so I think depending on that bound
> > > > > implicitly
> > > > > > > is okay.
> > > > > > >
> > > > > > > It is possible to make flush() be instead
> > > > > > >   boolean tryFlush(long timeout, TimeUnit unit);
> > > > > > >
> > > > > > > But I am somewhat skeptical that people will use this
> correctly.
> > > I.e
> > > > > > > consider the mirror maker code snippet I gave above, how would
> one
> > > > > > actually
> > > > > > > recover in this case other than retrying (which the client
> already
> > > does
> > > > > > > automatically)? After all if you are okay losing data then you
> > > don't
> > > > > need
> > > > > > > to bother calling flush at all, you can just let the messages
> be
> > > sent
> > > > > > > asynchronously.
> > > > > > >
> > > > > > > I think close() is actually different because you may well
> want to
> > > > > > shutdown
> > > > > > > immediately and just throw away unsent events.
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <
> wangguoz@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > The proposal looks good to me, will need some time to review
> the
> > > > > > > > implementation RB later.
> > > > > > > >
> > > > > > > > Bhavesh, I am wondering how you will use a flush() with a
> timeout
> > > > > since
> > > > > > > > such a call does not actually provide any flushing
> guarantees?
> > > > > > > >
> > > > > > > > As for close(), there is a separate JIRA for this:
> > > > > > > >
> > > > > > > > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660
> >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > > > > > > mistry.p.bhavesh@gmail.com
> > > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jay,
> > > > > > > > >
> > > > > > > > > How about adding timeout for each method calls
> > > > > > flush(timeout,TimeUnit)
> > > > > > > > and
> > > > > > > > > close(timeout,TimeUNIT) ?  We had runway io thread issue
> and
> > > caller
> > > > > > > > thread
> > > > > > > > > should not blocked for ever for these methods ?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Bhavesh
> > > > > > > > >
> > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <
> > > jay.kreps@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Well actually in the case of linger.ms = 0 the send is
> still
> > > > > > > > > asynchronous
> > > > > > > > > > so calling flush() blocks until all the previously sent
> > > records
> > > > > > have
> > > > > > > > > > completed. It doesn't speed anything up in that case,
> though,
> > > > > since
> > > > > > > > they
> > > > > > > > > > are already available to send.
> > > > > > > > > >
> > > > > > > > > > -Jay
> > > > > > > > > >
> > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
> > > > > > gshapira@cloudera.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Looks good to me.
> > > > > > > > > > >
> > > > > > > > > > > I like the idea of not blocking additional sends but
> not
> > > > > > > guaranteeing
> > > > > > > > > > that
> > > > > > > > > > > flush() will deliver them.
> > > > > > > > > > >
> > > > > > > > > > > I assume that with linger.ms = 0, flush will just be a
> > > noop
> > > > > > (since
> > > > > > > > the
> > > > > > > > > > > queue will be empty). Is that correct?
> > > > > > > > > > >
> > > > > > > > > > > Gwen
> > > > > > > > > > >
> > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
> > > > > jay.kreps@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Following up on our previous thread on making batch
> send
> > > a
> > > > > > little
> > > > > > > > > > easier,
> > > > > > > > > > > > here is a concrete proposal to add a flush() method
> to
> > > the
> > > > > > > > producer:
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > > > > > > > > > >
> > > > > > > > > > > > A proposed implementation is here:
> > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > > > > > > > > >
> > > > > > > > > > > > Thoughts?
> > > > > > > > > > > >
> > > > > > > > > > > > -Jay
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> > >
>
>

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Joel Koshy <jj...@gmail.com>.
Yes that is a counter-example. I'm okay either way on whether we
should have just flush() or have a timeout. Bhavesh, does Jay's
explanation a few replies prior address your concern? If so, shall we
consider this closed?

On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote:
> Yeah we could do that, I guess I just feel like it adds confusion because
> then you have to think about which timeout you want, when likely you don't
> want a timeout at all.
> 
> I guess the pattern I was thinking of was fflush or the java equivalent,
> which don't have timeouts:
> http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush()
> 
> -Jay
> 
> On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jj...@gmail.com> wrote:
> 
> > I think tryFlush with a timeout sounds good to me. This is really more
> > for consistency than anything else. I cannot think of any standard
> > blocking calls off the top of my head that don't have a timed variant.
> > E.g., Thread.join, Object.wait, Future.get Either that, or they
> > provide an entirely non-blocking mode (e.g., socketChannel.connect
> > followed by finishConnect)
> >
> > Thanks,
> >
> > Joel
> >
> > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
> > > Jay,
> > >
> > > The .flush() call seems like it would be the best way if you wanted
> > to-do a
> > > clean shutdown of the new producer?
> > >
> > > So, you could in your code "stop all incoming requests &&
> > producer.flush()
> > > && system.exit(value)" and know pretty much you won't drop anything on
> > the
> > > floor.
> > >
> > > This can be done with the callbacks and futures (sure) but .flush() seems
> > > to be the right time to block and a few lines of code, no?
> > >
> > > ~ Joestein
> > >
> > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps <ja...@gmail.com> wrote:
> > >
> > > > Hey Bhavesh,
> > > >
> > > > If a broker is not available a new one should be elected to take over,
> > so
> > > > although the flush might take longer it should still be quick. Even if
> > not
> > > > this should result in an error not a hang.
> > > >
> > > > The cases you enumerated are all covered already--if the user wants to
> > > > retry that is covered by the retry setting in the client, for all the
> > > > errors that is considered completion of the request. The post
> > condition of
> > > > flush isn't that all sends complete successfully, just that they
> > complete.
> > > > So if you try to send a message that is too big, when flush returns
> > calling
> > > > .get() on the future should not block and should produce the error.
> > > >
> > > > Basically the argument I am making is that the only reason you want to
> > call
> > > > flush() is to guarantee all the sends complete so if it doesn't
> > guarantee
> > > > that it will be somewhat confusing. This does mean blocking, but if you
> > > > don't want to block on the send then you wouldn't call flush().
> > > >
> > > > This has no impact on the block.on.buffer full setting. That impacts
> > what
> > > > happens when send() can't append to the buffer because it is full.
> > flush()
> > > > means any message previously sent (i.e. for which send() call has
> > returned)
> > > > needs to have its request completed. Hope that makes sense.
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
> > > > mistry.p.bhavesh@gmail.com>
> > > > wrote:
> > > >
> > > > > HI Jay,
> > > > >
> > > > > Imagine, if you have flaky network connection to brokers, and if
> > flush()
> > > > > will be blocked if "one of broker is not available" ( basically How
> > would
> > > > > be address failure mode and io thread not able to drain records or
> > busy
> > > > due
> > > > > to pending request". Do you flush() method is only to flush to in mem
> > > > queue
> > > > > or flush to broker over the network().
> > > > >
> > > > > Timeout helps with and pushing caller to handle what to do  ?  e.g
> > > > > re-enqueue records, drop entire batch or one of message is too big
> > cross
> > > > > the limit of max.message.size etc...
> > > > >
> > > > > Also, according to java doc for API  "The method will block until all
> > > > > previously sent records have completed sending (either successfully
> > or
> > > > with
> > > > > an error)", does this by-pass rule set by for block.on.buffer.full or
> > > > > batch.size
> > > > > when under load.
> > > > >
> > > > > That was my intention, and I am sorry I mixed-up close() method here
> > > > > without knowing that this is only for bulk send.
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Bhavesh
> > > > >
> > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > > > >
> > > > > > Yeah I second the problem Guozhang flags with giving flush a
> > timeout.
> > > > In
> > > > > > general failover in Kafka is a bounded thing unless you have
> > brought
> > > > your
> > > > > > Kafka cluster down entirely so I think depending on that bound
> > > > implicitly
> > > > > > is okay.
> > > > > >
> > > > > > It is possible to make flush() be instead
> > > > > >   boolean tryFlush(long timeout, TimeUnit unit);
> > > > > >
> > > > > > But I am somewhat skeptical that people will use this correctly.
> > I.e
> > > > > > consider the mirror maker code snippet I gave above, how would one
> > > > > actually
> > > > > > recover in this case other than retrying (which the client already
> > does
> > > > > > automatically)? After all if you are okay losing data then you
> > don't
> > > > need
> > > > > > to bother calling flush at all, you can just let the messages be
> > sent
> > > > > > asynchronously.
> > > > > >
> > > > > > I think close() is actually different because you may well want to
> > > > > shutdown
> > > > > > immediately and just throw away unsent events.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <wa...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > The proposal looks good to me, will need some time to review the
> > > > > > > implementation RB later.
> > > > > > >
> > > > > > > Bhavesh, I am wondering how you will use a flush() with a timeout
> > > > since
> > > > > > > such a call does not actually provide any flushing guarantees?
> > > > > > >
> > > > > > > As for close(), there is a separate JIRA for this:
> > > > > > >
> > > > > > > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660>
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > > > > > mistry.p.bhavesh@gmail.com
> > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jay,
> > > > > > > >
> > > > > > > > How about adding timeout for each method calls
> > > > > flush(timeout,TimeUnit)
> > > > > > > and
> > > > > > > > close(timeout,TimeUNIT) ?  We had runway io thread issue and
> > caller
> > > > > > > thread
> > > > > > > > should not blocked for ever for these methods ?
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Bhavesh
> > > > > > > >
> > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <
> > jay.kreps@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Well actually in the case of linger.ms = 0 the send is still
> > > > > > > > asynchronous
> > > > > > > > > so calling flush() blocks until all the previously sent
> > records
> > > > > have
> > > > > > > > > completed. It doesn't speed anything up in that case, though,
> > > > since
> > > > > > > they
> > > > > > > > > are already available to send.
> > > > > > > > >
> > > > > > > > > -Jay
> > > > > > > > >
> > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
> > > > > gshapira@cloudera.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Looks good to me.
> > > > > > > > > >
> > > > > > > > > > I like the idea of not blocking additional sends but not
> > > > > > guaranteeing
> > > > > > > > > that
> > > > > > > > > > flush() will deliver them.
> > > > > > > > > >
> > > > > > > > > > I assume that with linger.ms = 0, flush will just be a
> > noop
> > > > > (since
> > > > > > > the
> > > > > > > > > > queue will be empty). Is that correct?
> > > > > > > > > >
> > > > > > > > > > Gwen
> > > > > > > > > >
> > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
> > > > jay.kreps@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Following up on our previous thread on making batch send
> > a
> > > > > little
> > > > > > > > > easier,
> > > > > > > > > > > here is a concrete proposal to add a flush() method to
> > the
> > > > > > > producer:
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > > > > > > > > >
> > > > > > > > > > > A proposed implementation is here:
> > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > > > > > > > >
> > > > > > > > > > > Thoughts?
> > > > > > > > > > >
> > > > > > > > > > > -Jay
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> >


Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Jay Kreps <ja...@gmail.com>.
Yeah we could do that, I guess I just feel like it adds confusion because
then you have to think about which timeout you want, when likely you don't
want a timeout at all.

I guess the pattern I was thinking of was fflush or the java equivalent,
which don't have timeouts:
http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush()

-Jay

On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jj...@gmail.com> wrote:

> I think tryFlush with a timeout sounds good to me. This is really more
> for consistency than anything else. I cannot think of any standard
> blocking calls off the top of my head that don't have a timed variant.
> E.g., Thread.join, Object.wait, Future.get Either that, or they
> provide an entirely non-blocking mode (e.g., socketChannel.connect
> followed by finishConnect)
>
> Thanks,
>
> Joel
>
> On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
> > Jay,
> >
> > The .flush() call seems like it would be the best way if you wanted
> to-do a
> > clean shutdown of the new producer?
> >
> > So, you could in your code "stop all incoming requests &&
> producer.flush()
> > && system.exit(value)" and know pretty much you won't drop anything on
> the
> > floor.
> >
> > This can be done with the callbacks and futures (sure) but .flush() seems
> > to be the right time to block and a few lines of code, no?
> >
> > ~ Joestein
> >
> > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Hey Bhavesh,
> > >
> > > If a broker is not available a new one should be elected to take over,
> so
> > > although the flush might take longer it should still be quick. Even if
> not
> > > this should result in an error not a hang.
> > >
> > > The cases you enumerated are all covered already--if the user wants to
> > > retry that is covered by the retry setting in the client, for all the
> > > errors that is considered completion of the request. The post
> condition of
> > > flush isn't that all sends complete successfully, just that they
> complete.
> > > So if you try to send a message that is too big, when flush returns
> calling
> > > .get() on the future should not block and should produce the error.
> > >
> > > Basically the argument I am making is that the only reason you want to
> call
> > > flush() is to guarantee all the sends complete so if it doesn't
> guarantee
> > > that it will be somewhat confusing. This does mean blocking, but if you
> > > don't want to block on the send then you wouldn't call flush().
> > >
> > > This has no impact on the block.on.buffer full setting. That impacts
> what
> > > happens when send() can't append to the buffer because it is full.
> flush()
> > > means any message previously sent (i.e. for which send() call has
> returned)
> > > needs to have its request completed. Hope that makes sense.
> > >
> > > -Jay
> > >
> > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
> > > mistry.p.bhavesh@gmail.com>
> > > wrote:
> > >
> > > > HI Jay,
> > > >
> > > > Imagine, if you have flaky network connection to brokers, and if
> flush()
> > > > will be blocked if "one of broker is not available" ( basically How
> would
> > > > be address failure mode and io thread not able to drain records or
> busy
> > > due
> > > > to pending request". Do you flush() method is only to flush to in mem
> > > queue
> > > > or flush to broker over the network().
> > > >
> > > > Timeout helps with and pushing caller to handle what to do  ?  e.g
> > > > re-enqueue records, drop entire batch or one of message is too big
> cross
> > > > the limit of max.message.size etc...
> > > >
> > > > Also, according to java doc for API  "The method will block until all
> > > > previously sent records have completed sending (either successfully
> or
> > > with
> > > > an error)", does this by-pass rule set by for block.on.buffer.full or
> > > > batch.size
> > > > when under load.
> > > >
> > > > That was my intention, and I am sorry I mixed-up close() method here
> > > > without knowing that this is only for bulk send.
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> > > >
> > > > > Yeah I second the problem Guozhang flags with giving flush a
> timeout.
> > > In
> > > > > general failover in Kafka is a bounded thing unless you have
> brought
> > > your
> > > > > Kafka cluster down entirely so I think depending on that bound
> > > implicitly
> > > > > is okay.
> > > > >
> > > > > It is possible to make flush() be instead
> > > > >   boolean tryFlush(long timeout, TimeUnit unit);
> > > > >
> > > > > But I am somewhat skeptical that people will use this correctly.
> I.e
> > > > > consider the mirror maker code snippet I gave above, how would one
> > > > actually
> > > > > recover in this case other than retrying (which the client already
> does
> > > > > automatically)? After all if you are okay losing data then you
> don't
> > > need
> > > > > to bother calling flush at all, you can just let the messages be
> sent
> > > > > asynchronously.
> > > > >
> > > > > I think close() is actually different because you may well want to
> > > > shutdown
> > > > > immediately and just throw away unsent events.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > The proposal looks good to me, will need some time to review the
> > > > > > implementation RB later.
> > > > > >
> > > > > > Bhavesh, I am wondering how you will use a flush() with a timeout
> > > since
> > > > > > such a call does not actually provide any flushing guarantees?
> > > > > >
> > > > > > As for close(), there is a separate JIRA for this:
> > > > > >
> > > > > > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660>
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > > > > mistry.p.bhavesh@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Jay,
> > > > > > >
> > > > > > > How about adding timeout for each method calls
> > > > flush(timeout,TimeUnit)
> > > > > > and
> > > > > > > close(timeout,TimeUNIT) ?  We had runway io thread issue and
> caller
> > > > > > thread
> > > > > > > should not blocked for ever for these methods ?
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Bhavesh
> > > > > > >
> > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <
> jay.kreps@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Well actually in the case of linger.ms = 0 the send is still
> > > > > > > asynchronous
> > > > > > > > so calling flush() blocks until all the previously sent
> records
> > > > have
> > > > > > > > completed. It doesn't speed anything up in that case, though,
> > > since
> > > > > > they
> > > > > > > > are already available to send.
> > > > > > > >
> > > > > > > > -Jay
> > > > > > > >
> > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
> > > > gshapira@cloudera.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Looks good to me.
> > > > > > > > >
> > > > > > > > > I like the idea of not blocking additional sends but not
> > > > > guaranteeing
> > > > > > > > that
> > > > > > > > > flush() will deliver them.
> > > > > > > > >
> > > > > > > > > I assume that with linger.ms = 0, flush will just be a
> noop
> > > > (since
> > > > > > the
> > > > > > > > > queue will be empty). Is that correct?
> > > > > > > > >
> > > > > > > > > Gwen
> > > > > > > > >
> > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
> > > jay.kreps@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Following up on our previous thread on making batch send
> a
> > > > little
> > > > > > > > easier,
> > > > > > > > > > here is a concrete proposal to add a flush() method to
> the
> > > > > > producer:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > > > > > > > >
> > > > > > > > > > A proposed implementation is here:
> > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > > > > > > >
> > > > > > > > > > Thoughts?
> > > > > > > > > >
> > > > > > > > > > -Jay
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
>
>

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Joel Koshy <jj...@gmail.com>.
I think tryFlush with a timeout sounds good to me. This is really more
for consistency than anything else. I cannot think of any standard
blocking calls off the top of my head that don't have a timed variant.
E.g., Thread.join, Object.wait, Future.get Either that, or they
provide an entirely non-blocking mode (e.g., socketChannel.connect
followed by finishConnect)

Thanks,

Joel

On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
> Jay,
> 
> The .flush() call seems like it would be the best way if you wanted to-do a
> clean shutdown of the new producer?
> 
> So, you could in your code "stop all incoming requests && producer.flush()
> && system.exit(value)" and know pretty much you won't drop anything on the
> floor.
> 
> This can be done with the callbacks and futures (sure) but .flush() seems
> to be the right time to block and a few lines of code, no?
> 
> ~ Joestein
> 
> On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps <ja...@gmail.com> wrote:
> 
> > Hey Bhavesh,
> >
> > If a broker is not available a new one should be elected to take over, so
> > although the flush might take longer it should still be quick. Even if not
> > this should result in an error not a hang.
> >
> > The cases you enumerated are all covered already--if the user wants to
> > retry that is covered by the retry setting in the client, for all the
> > errors that is considered completion of the request. The post condition of
> > flush isn't that all sends complete successfully, just that they complete.
> > So if you try to send a message that is too big, when flush returns calling
> > .get() on the future should not block and should produce the error.
> >
> > Basically the argument I am making is that the only reason you want to call
> > flush() is to guarantee all the sends complete so if it doesn't guarantee
> > that it will be somewhat confusing. This does mean blocking, but if you
> > don't want to block on the send then you wouldn't call flush().
> >
> > This has no impact on the block.on.buffer full setting. That impacts what
> > happens when send() can't append to the buffer because it is full. flush()
> > means any message previously sent (i.e. for which send() call has returned)
> > needs to have its request completed. Hope that makes sense.
> >
> > -Jay
> >
> > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
> > mistry.p.bhavesh@gmail.com>
> > wrote:
> >
> > > HI Jay,
> > >
> > > Imagine, if you have flaky network connection to brokers, and if flush()
> > > will be blocked if "one of broker is not available" ( basically How would
> > > be address failure mode and io thread not able to drain records or busy
> > due
> > > to pending request". Do you flush() method is only to flush to in mem
> > queue
> > > or flush to broker over the network().
> > >
> > > Timeout helps with and pushing caller to handle what to do  ?  e.g
> > > re-enqueue records, drop entire batch or one of message is too big cross
> > > the limit of max.message.size etc...
> > >
> > > Also, according to java doc for API  "The method will block until all
> > > previously sent records have completed sending (either successfully or
> > with
> > > an error)", does this by-pass rule set by for block.on.buffer.full or
> > > batch.size
> > > when under load.
> > >
> > > That was my intention, and I am sorry I mixed-up close() method here
> > > without knowing that this is only for bulk send.
> > >
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps <ja...@gmail.com> wrote:
> > >
> > > > Yeah I second the problem Guozhang flags with giving flush a timeout.
> > In
> > > > general failover in Kafka is a bounded thing unless you have brought
> > your
> > > > Kafka cluster down entirely so I think depending on that bound
> > implicitly
> > > > is okay.
> > > >
> > > > It is possible to make flush() be instead
> > > >   boolean tryFlush(long timeout, TimeUnit unit);
> > > >
> > > > But I am somewhat skeptical that people will use this correctly. I.e
> > > > consider the mirror maker code snippet I gave above, how would one
> > > actually
> > > > recover in this case other than retrying (which the client already does
> > > > automatically)? After all if you are okay losing data then you don't
> > need
> > > > to bother calling flush at all, you can just let the messages be sent
> > > > asynchronously.
> > > >
> > > > I think close() is actually different because you may well want to
> > > shutdown
> > > > immediately and just throw away unsent events.
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > The proposal looks good to me, will need some time to review the
> > > > > implementation RB later.
> > > > >
> > > > > Bhavesh, I am wondering how you will use a flush() with a timeout
> > since
> > > > > such a call does not actually provide any flushing guarantees?
> > > > >
> > > > > As for close(), there is a separate JIRA for this:
> > > > >
> > > > > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660>
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > > > mistry.p.bhavesh@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Jay,
> > > > > >
> > > > > > How about adding timeout for each method calls
> > > flush(timeout,TimeUnit)
> > > > > and
> > > > > > close(timeout,TimeUNIT) ?  We had runway io thread issue and caller
> > > > > thread
> > > > > > should not blocked for ever for these methods ?
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bhavesh
> > > > > >
> > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <ja...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Well actually in the case of linger.ms = 0 the send is still
> > > > > > asynchronous
> > > > > > > so calling flush() blocks until all the previously sent records
> > > have
> > > > > > > completed. It doesn't speed anything up in that case, though,
> > since
> > > > > they
> > > > > > > are already available to send.
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
> > > gshapira@cloudera.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Looks good to me.
> > > > > > > >
> > > > > > > > I like the idea of not blocking additional sends but not
> > > > guaranteeing
> > > > > > > that
> > > > > > > > flush() will deliver them.
> > > > > > > >
> > > > > > > > I assume that with linger.ms = 0, flush will just be a noop
> > > (since
> > > > > the
> > > > > > > > queue will be empty). Is that correct?
> > > > > > > >
> > > > > > > > Gwen
> > > > > > > >
> > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
> > jay.kreps@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Following up on our previous thread on making batch send a
> > > little
> > > > > > > easier,
> > > > > > > > > here is a concrete proposal to add a flush() method to the
> > > > > producer:
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > > > > > > >
> > > > > > > > > A proposed implementation is here:
> > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > > > > > >
> > > > > > > > > Thoughts?
> > > > > > > > >
> > > > > > > > > -Jay
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >


Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Joe Stein <jo...@stealth.ly>.
Jay,

The .flush() call seems like it would be the best way if you wanted to-do a
clean shutdown of the new producer?

So, you could in your code "stop all incoming requests && producer.flush()
&& system.exit(value)" and know pretty much you won't drop anything on the
floor.

This can be done with the callbacks and futures (sure) but .flush() seems
to be the right time to block and a few lines of code, no?

~ Joestein

On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps <ja...@gmail.com> wrote:

> Hey Bhavesh,
>
> If a broker is not available a new one should be elected to take over, so
> although the flush might take longer it should still be quick. Even if not
> this should result in an error not a hang.
>
> The cases you enumerated are all covered already--if the user wants to
> retry that is covered by the retry setting in the client, for all the
> errors that is considered completion of the request. The post condition of
> flush isn't that all sends complete successfully, just that they complete.
> So if you try to send a message that is too big, when flush returns calling
> .get() on the future should not block and should produce the error.
>
> Basically the argument I am making is that the only reason you want to call
> flush() is to guarantee all the sends complete so if it doesn't guarantee
> that it will be somewhat confusing. This does mean blocking, but if you
> don't want to block on the send then you wouldn't call flush().
>
> This has no impact on the block.on.buffer full setting. That impacts what
> happens when send() can't append to the buffer because it is full. flush()
> means any message previously sent (i.e. for which send() call has returned)
> needs to have its request completed. Hope that makes sense.
>
> -Jay
>
> On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com>
> wrote:
>
> > HI Jay,
> >
> > Imagine, if you have flaky network connection to brokers, and if flush()
> > will be blocked if "one of broker is not available" ( basically How would
> > be address failure mode and io thread not able to drain records or busy
> due
> > to pending request". Do you flush() method is only to flush to in mem
> queue
> > or flush to broker over the network().
> >
> > Timeout helps with and pushing caller to handle what to do  ?  e.g
> > re-enqueue records, drop entire batch or one of message is too big cross
> > the limit of max.message.size etc...
> >
> > Also, according to java doc for API  "The method will block until all
> > previously sent records have completed sending (either successfully or
> with
> > an error)", does this by-pass rule set by for block.on.buffer.full or
> > batch.size
> > when under load.
> >
> > That was my intention, and I am sorry I mixed-up close() method here
> > without knowing that this is only for bulk send.
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Yeah I second the problem Guozhang flags with giving flush a timeout.
> In
> > > general failover in Kafka is a bounded thing unless you have brought
> your
> > > Kafka cluster down entirely so I think depending on that bound
> implicitly
> > > is okay.
> > >
> > > It is possible to make flush() be instead
> > >   boolean tryFlush(long timeout, TimeUnit unit);
> > >
> > > But I am somewhat skeptical that people will use this correctly. I.e
> > > consider the mirror maker code snippet I gave above, how would one
> > actually
> > > recover in this case other than retrying (which the client already does
> > > automatically)? After all if you are okay losing data then you don't
> need
> > > to bother calling flush at all, you can just let the messages be sent
> > > asynchronously.
> > >
> > > I think close() is actually different because you may well want to
> > shutdown
> > > immediately and just throw away unsent events.
> > >
> > > -Jay
> > >
> > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > The proposal looks good to me, will need some time to review the
> > > > implementation RB later.
> > > >
> > > > Bhavesh, I am wondering how you will use a flush() with a timeout
> since
> > > > such a call does not actually provide any flushing guarantees?
> > > >
> > > > As for close(), there is a separate JIRA for this:
> > > >
> > > > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660>
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > > mistry.p.bhavesh@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > Hi Jay,
> > > > >
> > > > > How about adding timeout for each method calls
> > flush(timeout,TimeUnit)
> > > > and
> > > > > close(timeout,TimeUNIT) ?  We had runway io thread issue and caller
> > > > thread
> > > > > should not blocked for ever for these methods ?
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Bhavesh
> > > > >
> > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <ja...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Well actually in the case of linger.ms = 0 the send is still
> > > > > asynchronous
> > > > > > so calling flush() blocks until all the previously sent records
> > have
> > > > > > completed. It doesn't speed anything up in that case, though,
> since
> > > > they
> > > > > > are already available to send.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
> > gshapira@cloudera.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Looks good to me.
> > > > > > >
> > > > > > > I like the idea of not blocking additional sends but not
> > > guaranteeing
> > > > > > that
> > > > > > > flush() will deliver them.
> > > > > > >
> > > > > > > I assume that with linger.ms = 0, flush will just be a noop
> > (since
> > > > the
> > > > > > > queue will be empty). Is that correct?
> > > > > > >
> > > > > > > Gwen
> > > > > > >
> > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
> jay.kreps@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Following up on our previous thread on making batch send a
> > little
> > > > > > easier,
> > > > > > > > here is a concrete proposal to add a flush() method to the
> > > > producer:
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > > > > > >
> > > > > > > > A proposed implementation is here:
> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > > > > >
> > > > > > > > Thoughts?
> > > > > > > >
> > > > > > > > -Jay
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Jay Kreps <ja...@gmail.com>.
Hey Bhavesh,

If a broker is not available a new one should be elected to take over, so
although the flush might take longer it should still be quick. Even if not
this should result in an error not a hang.

The cases you enumerated are all covered already--if the user wants to
retry that is covered by the retry setting in the client, for all the
errors that is considered completion of the request. The post condition of
flush isn't that all sends complete successfully, just that they complete.
So if you try to send a message that is too big, when flush returns calling
.get() on the future should not block and should produce the error.

Basically the argument I am making is that the only reason you want to call
flush() is to guarantee all the sends complete so if it doesn't guarantee
that it will be somewhat confusing. This does mean blocking, but if you
don't want to block on the send then you wouldn't call flush().

This has no impact on the block.on.buffer full setting. That impacts what
happens when send() can't append to the buffer because it is full. flush()
means any message previously sent (i.e. for which send() call has returned)
needs to have its request completed. Hope that makes sense.

-Jay

On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> HI Jay,
>
> Imagine, if you have flaky network connection to brokers, and if flush()
> will be blocked if "one of broker is not available" ( basically How would
> be address failure mode and io thread not able to drain records or busy due
> to pending request". Do you flush() method is only to flush to in mem queue
> or flush to broker over the network().
>
> Timeout helps with and pushing caller to handle what to do  ?  e.g
> re-enqueue records, drop entire batch or one of message is too big cross
> the limit of max.message.size etc...
>
> Also, according to java doc for API  "The method will block until all
> previously sent records have completed sending (either successfully or with
> an error)", does this by-pass rule set by for block.on.buffer.full or
> batch.size
> when under load.
>
> That was my intention, and I am sorry I mixed-up close() method here
> without knowing that this is only for bulk send.
>
>
> Thanks,
>
> Bhavesh
>
> On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Yeah I second the problem Guozhang flags with giving flush a timeout. In
> > general failover in Kafka is a bounded thing unless you have brought your
> > Kafka cluster down entirely so I think depending on that bound implicitly
> > is okay.
> >
> > It is possible to make flush() be instead
> >   boolean tryFlush(long timeout, TimeUnit unit);
> >
> > But I am somewhat skeptical that people will use this correctly. I.e
> > consider the mirror maker code snippet I gave above, how would one
> actually
> > recover in this case other than retrying (which the client already does
> > automatically)? After all if you are okay losing data then you don't need
> > to bother calling flush at all, you can just let the messages be sent
> > asynchronously.
> >
> > I think close() is actually different because you may well want to
> shutdown
> > immediately and just throw away unsent events.
> >
> > -Jay
> >
> > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > The proposal looks good to me, will need some time to review the
> > > implementation RB later.
> > >
> > > Bhavesh, I am wondering how you will use a flush() with a timeout since
> > > such a call does not actually provide any flushing guarantees?
> > >
> > > As for close(), there is a separate JIRA for this:
> > >
> > > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660>
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > mistry.p.bhavesh@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi Jay,
> > > >
> > > > How about adding timeout for each method calls
> flush(timeout,TimeUnit)
> > > and
> > > > close(timeout,TimeUNIT) ?  We had runway io thread issue and caller
> > > thread
> > > > should not blocked for ever for these methods ?
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > > >
> > > > > Well actually in the case of linger.ms = 0 the send is still
> > > > asynchronous
> > > > > so calling flush() blocks until all the previously sent records
> have
> > > > > completed. It doesn't speed anything up in that case, though, since
> > > they
> > > > > are already available to send.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
> gshapira@cloudera.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Looks good to me.
> > > > > >
> > > > > > I like the idea of not blocking additional sends but not
> > guaranteeing
> > > > > that
> > > > > > flush() will deliver them.
> > > > > >
> > > > > > I assume that with linger.ms = 0, flush will just be a noop
> (since
> > > the
> > > > > > queue will be empty). Is that correct?
> > > > > >
> > > > > > Gwen
> > > > > >
> > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <ja...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Following up on our previous thread on making batch send a
> little
> > > > > easier,
> > > > > > > here is a concrete proposal to add a flush() method to the
> > > producer:
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > > > > >
> > > > > > > A proposed implementation is here:
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > > > >
> > > > > > > Thoughts?
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Bhavesh Mistry <mi...@gmail.com>.
HI Jay,

Imagine, if you have flaky network connection to brokers, and if flush()
will be blocked if "one of broker is not available" ( basically How would
be address failure mode and io thread not able to drain records or busy due
to pending request". Do you flush() method is only to flush to in mem queue
or flush to broker over the network().

Timeout helps with and pushing caller to handle what to do  ?  e.g
re-enqueue records, drop entire batch or one of message is too big cross
the limit of max.message.size etc...

Also, according to java doc for API  "The method will block until all
previously sent records have completed sending (either successfully or with
an error)", does this by-pass rule set by for block.on.buffer.full or
batch.size
when under load.

That was my intention, and I am sorry I mixed-up close() method here
without knowing that this is only for bulk send.


Thanks,

Bhavesh

On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps <ja...@gmail.com> wrote:

> Yeah I second the problem Guozhang flags with giving flush a timeout. In
> general failover in Kafka is a bounded thing unless you have brought your
> Kafka cluster down entirely so I think depending on that bound implicitly
> is okay.
>
> It is possible to make flush() be instead
>   boolean tryFlush(long timeout, TimeUnit unit);
>
> But I am somewhat skeptical that people will use this correctly. I.e
> consider the mirror maker code snippet I gave above, how would one actually
> recover in this case other than retrying (which the client already does
> automatically)? After all if you are okay losing data then you don't need
> to bother calling flush at all, you can just let the messages be sent
> asynchronously.
>
> I think close() is actually different because you may well want to shutdown
> immediately and just throw away unsent events.
>
> -Jay
>
> On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > The proposal looks good to me, will need some time to review the
> > implementation RB later.
> >
> > Bhavesh, I am wondering how you will use a flush() with a timeout since
> > such a call does not actually provide any flushing guarantees?
> >
> > As for close(), there is a separate JIRA for this:
> >
> > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660>
> >
> > Guozhang
> >
> >
> > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com
> > >
> > wrote:
> >
> > > Hi Jay,
> > >
> > > How about adding timeout for each method calls flush(timeout,TimeUnit)
> > and
> > > close(timeout,TimeUNIT) ?  We had runway io thread issue and caller
> > thread
> > > should not blocked for ever for these methods ?
> > >
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >
> > > > Well actually in the case of linger.ms = 0 the send is still
> > > asynchronous
> > > > so calling flush() blocks until all the previously sent records have
> > > > completed. It doesn't speed anything up in that case, though, since
> > they
> > > > are already available to send.
> > > >
> > > > -Jay
> > > >
> > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <gshapira@cloudera.com
> >
> > > > wrote:
> > > >
> > > > > Looks good to me.
> > > > >
> > > > > I like the idea of not blocking additional sends but not
> guaranteeing
> > > > that
> > > > > flush() will deliver them.
> > > > >
> > > > > I assume that with linger.ms = 0, flush will just be a noop (since
> > the
> > > > > queue will be empty). Is that correct?
> > > > >
> > > > > Gwen
> > > > >
> > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <ja...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Following up on our previous thread on making batch send a little
> > > > easier,
> > > > > > here is a concrete proposal to add a flush() method to the
> > producer:
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > > > >
> > > > > > A proposed implementation is here:
> > > > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > > >
> > > > > > Thoughts?
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Jay Kreps <ja...@gmail.com>.
Yeah I second the problem Guozhang flags with giving flush a timeout. In
general failover in Kafka is a bounded thing unless you have brought your
Kafka cluster down entirely so I think depending on that bound implicitly
is okay.

It is possible to make flush() be instead
  boolean tryFlush(long timeout, TimeUnit unit);

But I am somewhat skeptical that people will use this correctly. I.e
consider the mirror maker code snippet I gave above, how would one actually
recover in this case other than retrying (which the client already does
automatically)? After all if you are okay losing data then you don't need
to bother calling flush at all, you can just let the messages be sent
asynchronously.

I think close() is actually different because you may well want to shutdown
immediately and just throw away unsent events.

-Jay

On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <wa...@gmail.com> wrote:

> The proposal looks good to me, will need some time to review the
> implementation RB later.
>
> Bhavesh, I am wondering how you will use a flush() with a timeout since
> such a call does not actually provide any flushing guarantees?
>
> As for close(), there is a separate JIRA for this:
>
> KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660>
>
> Guozhang
>
>
> On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <mistry.p.bhavesh@gmail.com
> >
> wrote:
>
> > Hi Jay,
> >
> > How about adding timeout for each method calls flush(timeout,TimeUnit)
> and
> > close(timeout,TimeUNIT) ?  We had runway io thread issue and caller
> thread
> > should not blocked for ever for these methods ?
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Well actually in the case of linger.ms = 0 the send is still
> > asynchronous
> > > so calling flush() blocks until all the previously sent records have
> > > completed. It doesn't speed anything up in that case, though, since
> they
> > > are already available to send.
> > >
> > > -Jay
> > >
> > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <gs...@cloudera.com>
> > > wrote:
> > >
> > > > Looks good to me.
> > > >
> > > > I like the idea of not blocking additional sends but not guaranteeing
> > > that
> > > > flush() will deliver them.
> > > >
> > > > I assume that with linger.ms = 0, flush will just be a noop (since
> the
> > > > queue will be empty). Is that correct?
> > > >
> > > > Gwen
> > > >
> > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > > >
> > > > > Following up on our previous thread on making batch send a little
> > > easier,
> > > > > here is a concrete proposal to add a flush() method to the
> producer:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > > >
> > > > > A proposed implementation is here:
> > > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > -Jay
> > > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Guozhang Wang <wa...@gmail.com>.
The proposal looks good to me, will need some time to review the
implementation RB later.

Bhavesh, I am wondering how you will use a flush() with a timeout since
such a call does not actually provide any flushing guarantees?

As for close(), there is a separate JIRA for this:

KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660>

Guozhang


On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> Hi Jay,
>
> How about adding timeout for each method calls flush(timeout,TimeUnit) and
> close(timeout,TimeUNIT) ?  We had runway io thread issue and caller thread
> should not blocked for ever for these methods ?
>
>
> Thanks,
>
> Bhavesh
>
> On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Well actually in the case of linger.ms = 0 the send is still
> asynchronous
> > so calling flush() blocks until all the previously sent records have
> > completed. It doesn't speed anything up in that case, though, since they
> > are already available to send.
> >
> > -Jay
> >
> > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <gs...@cloudera.com>
> > wrote:
> >
> > > Looks good to me.
> > >
> > > I like the idea of not blocking additional sends but not guaranteeing
> > that
> > > flush() will deliver them.
> > >
> > > I assume that with linger.ms = 0, flush will just be a noop (since the
> > > queue will be empty). Is that correct?
> > >
> > > Gwen
> > >
> > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >
> > > > Following up on our previous thread on making batch send a little
> > easier,
> > > > here is a concrete proposal to add a flush() method to the producer:
> > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > >
> > > > A proposed implementation is here:
> > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > >
> > > > Thoughts?
> > > >
> > > > -Jay
> > > >
> > >
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi Jay,

How about adding timeout for each method calls flush(timeout,TimeUnit) and
close(timeout,TimeUNIT) ?  We had runway io thread issue and caller thread
should not blocked for ever for these methods ?


Thanks,

Bhavesh

On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <ja...@gmail.com> wrote:

> Well actually in the case of linger.ms = 0 the send is still asynchronous
> so calling flush() blocks until all the previously sent records have
> completed. It doesn't speed anything up in that case, though, since they
> are already available to send.
>
> -Jay
>
> On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <gs...@cloudera.com>
> wrote:
>
> > Looks good to me.
> >
> > I like the idea of not blocking additional sends but not guaranteeing
> that
> > flush() will deliver them.
> >
> > I assume that with linger.ms = 0, flush will just be a noop (since the
> > queue will be empty). Is that correct?
> >
> > Gwen
> >
> > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Following up on our previous thread on making batch send a little
> easier,
> > > here is a concrete proposal to add a flush() method to the producer:
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > >
> > > A proposed implementation is here:
> > > https://issues.apache.org/jira/browse/KAFKA-1865
> > >
> > > Thoughts?
> > >
> > > -Jay
> > >
> >
>

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Jay Kreps <ja...@gmail.com>.
Well actually in the case of linger.ms = 0 the send is still asynchronous
so calling flush() blocks until all the previously sent records have
completed. It doesn't speed anything up in that case, though, since they
are already available to send.

-Jay

On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <gs...@cloudera.com> wrote:

> Looks good to me.
>
> I like the idea of not blocking additional sends but not guaranteeing that
> flush() will deliver them.
>
> I assume that with linger.ms = 0, flush will just be a noop (since the
> queue will be empty). Is that correct?
>
> Gwen
>
> On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Following up on our previous thread on making batch send a little easier,
> > here is a concrete proposal to add a flush() method to the producer:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> >
> > A proposed implementation is here:
> > https://issues.apache.org/jira/browse/KAFKA-1865
> >
> > Thoughts?
> >
> > -Jay
> >
>

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Jay Kreps <ja...@gmail.com>.
Well actually in the case of linger.ms = 0 the send is still asynchronous
so calling flush() blocks until all the previously sent records have
completed. It doesn't speed anything up in that case, though, since they
are already available to send.

-Jay

On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <gs...@cloudera.com> wrote:

> Looks good to me.
>
> I like the idea of not blocking additional sends but not guaranteeing that
> flush() will deliver them.
>
> I assume that with linger.ms = 0, flush will just be a noop (since the
> queue will be empty). Is that correct?
>
> Gwen
>
> On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Following up on our previous thread on making batch send a little easier,
> > here is a concrete proposal to add a flush() method to the producer:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> >
> > A proposed implementation is here:
> > https://issues.apache.org/jira/browse/KAFKA-1865
> >
> > Thoughts?
> >
> > -Jay
> >
>

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Gwen Shapira <gs...@cloudera.com>.
Looks good to me.

I like the idea of not blocking additional sends but not guaranteeing that
flush() will deliver them.

I assume that with linger.ms = 0, flush will just be a noop (since the
queue will be empty). Is that correct?

Gwen

On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <ja...@gmail.com> wrote:

> Following up on our previous thread on making batch send a little easier,
> here is a concrete proposal to add a flush() method to the producer:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
>
> A proposed implementation is here:
> https://issues.apache.org/jira/browse/KAFKA-1865
>
> Thoughts?
>
> -Jay
>

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

Posted by Gwen Shapira <gs...@cloudera.com>.
Looks good to me.

I like the idea of not blocking additional sends but not guaranteeing that
flush() will deliver them.

I assume that with linger.ms = 0, flush will just be a noop (since the
queue will be empty). Is that correct?

Gwen

On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <ja...@gmail.com> wrote:

> Following up on our previous thread on making batch send a little easier,
> here is a concrete proposal to add a flush() method to the producer:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
>
> A proposed implementation is here:
> https://issues.apache.org/jira/browse/KAFKA-1865
>
> Thoughts?
>
> -Jay
>