You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Otis Gospodnetic <ot...@gmail.com> on 2015/02/02 20:08:43 UTC

New Producer - ONLY sync mode?

Hi,

Is the plan for New Producer to have ONLY async mode?  I'm asking because
of this info from the Wiki:


   - The producer will always attempt to batch data and will always
   immediately return a SendResponse which acts as a Future to allow the
   client to await the completion of the request.


The word "always" makes me think there will be no sync mode.

Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/

Re: New Producer - ONLY sync mode?

Posted by Jay Kreps <ja...@gmail.com>.
I implemented the flush() call I hypothesized earlier in this thread as a
patch on KAFKA-1865.

So now
  producer.flush()
will block until all buffered requests complete. The post condition is that
all previous send futures are satisfied and have error/offset information.
This is a little easier to use then keeping a list of all the futures and
also probably a bit more efficient. It also immediately unblocks all
requests regardless of the linger.ms setting, which is in keeping with the
name I think and important for the use case we described.

Code here:
https://issues.apache.org/jira/browse/KAFKA-1865

-Jay

On Sat, Feb 7, 2015 at 1:24 PM, Jay Kreps <ja...@gmail.com> wrote:

> Hey Otis,
>
> Yeah, Gwen is correct. The future from the send will be satisfied when the
> response is received so it will be exactly the same as the performance of
> the sync producer previously.
>
> -Jay
>
> On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <gs...@cloudera.com>
> wrote:
>
>> If I understood the code and Jay correctly - if you wait for the
>> future it will be a similar delay to that of the old sync producer.
>>
>> Put another way, if you test it out and see longer delays than the
>> sync producer had, we need to find out why and fix it.
>>
>> Gwen
>>
>> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
>> <ot...@gmail.com> wrote:
>> > Hi,
>> >
>> > Nope, unfortunately it can't do that.  X is a remote app, doesn't
>> listen to
>> > anything external, calls Y via HTTPS.  So X has to decide what to do
>> with
>> > its data based on Y's synchronous response.  It has to block until Y
>> > responds.  And it wouldn't be pretty, I think, because nobody wants to
>> run
>> > apps that talk to remove servers and hang on to connections more than
>> they
>> > have to.  But perhaps that is the only way?  Or maybe the answer to "I'm
>> > guessing the delay would be more or less the same as if the Producer was
>> > using SYNC mode?" is YES, in which case the connection from X to Y
>> would be
>> > open for just as long as with a SYNC producer running in Y?
>> >
>> > Thanks,
>> > Otis
>> > --
>> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> > Solr & Elasticsearch Support * http://sematext.com/
>> >
>> >
>> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <gs...@cloudera.com>
>> wrote:
>> >
>> >> Can Y have a callback that will handle the notification to X?
>> >> In this case, perhaps Y can be async and X can buffer the data until
>> >> the callback triggers and says "all good" (or resend if the callback
>> >> indicates an error)
>> >>
>> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
>> >> <ot...@gmail.com> wrote:
>> >> > Hi,
>> >> >
>> >> > Thanks for the info.  Here's the use case.  We have something up
>> stream
>> >> > sending data, say a log shipper called X.  It sends it to some remote
>> >> > component Y.  Y is the Kafka Producer and it puts data into Kafka.
>> But Y
>> >> > needs to send a reply to X and tell it whether it successfully put
>> all
>> >> its
>> >> > data into Kafka.  If it did not, Y wants to tell X to buffer data
>> locally
>> >> > and resend it later.
>> >> >
>> >> > If producer is ONLY async, Y can't easily do that.  Or maybe Y would
>> just
>> >> > need to wait for the Future to come back and only then send the
>> response
>> >> > back to X?  If so, I'm guessing the delay would be more or less the
>> same
>> >> as
>> >> > if the Producer was using SYNC mode?
>> >> >
>> >> > Thanks,
>> >> > Otis
>> >> > --
>> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
>> Management
>> >> > Solr & Elasticsearch Support * http://sematext.com/
>> >> >
>> >> >
>> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <ja...@gmail.com>
>> wrote:
>> >> >
>> >> >> Yeah as Gwen says there is no sync/async mode anymore. There is a
>> new
>> >> >> configuration which does a lot of what async did in terms of
>> allowing
>> >> >> batching:
>> >> >>
>> >> >> batch.size - This is the target amount of data per partition the
>> server
>> >> >> will attempt to batch together.
>> >> >> linger.ms - This is the time the producer will wait for more data
>> to be
>> >> >> sent to better batch up writes. The default is 0 (send
>> immediately). So
>> >> if
>> >> >> you set this to 50 ms the client will send immediately if it has
>> already
>> >> >> filled up its batch, otherwise it will wait to accumulate the
>> number of
>> >> >> bytes given by batch.size.
>> >> >>
>> >> >> To send asynchronously you do
>> >> >>    producer.send(record)
>> >> >> whereas to block on a response you do
>> >> >>    producer.send(record).get();
>> >> >> which will wait for acknowledgement from the server.
>> >> >>
>> >> >> One advantage of this model is that the client will do it's best to
>> >> batch
>> >> >> under the covers even if linger.ms=0. It will do this by batching
>> any
>> >> data
>> >> >> that arrives while another send is in progress into a single
>> >> >> request--giving a kind of "group commit" effect.
>> >> >>
>> >> >> The hope is that this will be both simpler to understand (a single
>> api
>> >> that
>> >> >> always works the same) and more powerful (you always get a response
>> with
>> >> >> error and offset information whether or not you choose to use it).
>> >> >>
>> >> >> -Jay
>> >> >>
>> >> >>
>> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <
>> gshapira@cloudera.com>
>> >> >> wrote:
>> >> >>
>> >> >> > If you want to emulate the old sync producer behavior, you need
>> to set
>> >> >> > the batch size to 1  (in producer config) and wait on the future
>> you
>> >> >> > get from Send (i.e. future.get)
>> >> >> >
>> >> >> > I can't think of good reasons to do so, though.
>> >> >> >
>> >> >> > Gwen
>> >> >> >
>> >> >> >
>> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
>> >> >> > <ot...@gmail.com> wrote:
>> >> >> > > Hi,
>> >> >> > >
>> >> >> > > Is the plan for New Producer to have ONLY async mode?  I'm
>> asking
>> >> >> because
>> >> >> > > of this info from the Wiki:
>> >> >> > >
>> >> >> > >
>> >> >> > >    - The producer will always attempt to batch data and will
>> always
>> >> >> > >    immediately return a SendResponse which acts as a Future to
>> allow
>> >> >> the
>> >> >> > >    client to await the completion of the request.
>> >> >> > >
>> >> >> > >
>> >> >> > > The word "always" makes me think there will be no sync mode.
>> >> >> > >
>> >> >> > > Thanks,
>> >> >> > > Otis
>> >> >> > > --
>> >> >> > > Monitoring * Alerting * Anomaly Detection * Centralized Log
>> >> Management
>> >> >> > > Solr & Elasticsearch Support * http://sematext.com/
>> >> >> >
>> >> >>
>> >>
>>
>
>

Re: New Producer - ONLY sync mode?

Posted by Jay Kreps <ja...@gmail.com>.
Hey Otis,

Yeah, Gwen is correct. The future from the send will be satisfied when the
response is received so it will be exactly the same as the performance of
the sync producer previously.

-Jay

On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <gs...@cloudera.com> wrote:

> If I understood the code and Jay correctly - if you wait for the
> future it will be a similar delay to that of the old sync producer.
>
> Put another way, if you test it out and see longer delays than the
> sync producer had, we need to find out why and fix it.
>
> Gwen
>
> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> <ot...@gmail.com> wrote:
> > Hi,
> >
> > Nope, unfortunately it can't do that.  X is a remote app, doesn't listen
> to
> > anything external, calls Y via HTTPS.  So X has to decide what to do with
> > its data based on Y's synchronous response.  It has to block until Y
> > responds.  And it wouldn't be pretty, I think, because nobody wants to
> run
> > apps that talk to remove servers and hang on to connections more than
> they
> > have to.  But perhaps that is the only way?  Or maybe the answer to "I'm
> > guessing the delay would be more or less the same as if the Producer was
> > using SYNC mode?" is YES, in which case the connection from X to Y would
> be
> > open for just as long as with a SYNC producer running in Y?
> >
> > Thanks,
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <gs...@cloudera.com>
> wrote:
> >
> >> Can Y have a callback that will handle the notification to X?
> >> In this case, perhaps Y can be async and X can buffer the data until
> >> the callback triggers and says "all good" (or resend if the callback
> >> indicates an error)
> >>
> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> >> <ot...@gmail.com> wrote:
> >> > Hi,
> >> >
> >> > Thanks for the info.  Here's the use case.  We have something up
> stream
> >> > sending data, say a log shipper called X.  It sends it to some remote
> >> > component Y.  Y is the Kafka Producer and it puts data into Kafka.
> But Y
> >> > needs to send a reply to X and tell it whether it successfully put all
> >> its
> >> > data into Kafka.  If it did not, Y wants to tell X to buffer data
> locally
> >> > and resend it later.
> >> >
> >> > If producer is ONLY async, Y can't easily do that.  Or maybe Y would
> just
> >> > need to wait for the Future to come back and only then send the
> response
> >> > back to X?  If so, I'm guessing the delay would be more or less the
> same
> >> as
> >> > if the Producer was using SYNC mode?
> >> >
> >> > Thanks,
> >> > Otis
> >> > --
> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> >> > Solr & Elasticsearch Support * http://sematext.com/
> >> >
> >> >
> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> >> >
> >> >> Yeah as Gwen says there is no sync/async mode anymore. There is a new
> >> >> configuration which does a lot of what async did in terms of allowing
> >> >> batching:
> >> >>
> >> >> batch.size - This is the target amount of data per partition the
> server
> >> >> will attempt to batch together.
> >> >> linger.ms - This is the time the producer will wait for more data
> to be
> >> >> sent to better batch up writes. The default is 0 (send immediately).
> So
> >> if
> >> >> you set this to 50 ms the client will send immediately if it has
> already
> >> >> filled up its batch, otherwise it will wait to accumulate the number
> of
> >> >> bytes given by batch.size.
> >> >>
> >> >> To send asynchronously you do
> >> >>    producer.send(record)
> >> >> whereas to block on a response you do
> >> >>    producer.send(record).get();
> >> >> which will wait for acknowledgement from the server.
> >> >>
> >> >> One advantage of this model is that the client will do it's best to
> >> batch
> >> >> under the covers even if linger.ms=0. It will do this by batching
> any
> >> data
> >> >> that arrives while another send is in progress into a single
> >> >> request--giving a kind of "group commit" effect.
> >> >>
> >> >> The hope is that this will be both simpler to understand (a single
> api
> >> that
> >> >> always works the same) and more powerful (you always get a response
> with
> >> >> error and offset information whether or not you choose to use it).
> >> >>
> >> >> -Jay
> >> >>
> >> >>
> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <gshapira@cloudera.com
> >
> >> >> wrote:
> >> >>
> >> >> > If you want to emulate the old sync producer behavior, you need to
> set
> >> >> > the batch size to 1  (in producer config) and wait on the future
> you
> >> >> > get from Send (i.e. future.get)
> >> >> >
> >> >> > I can't think of good reasons to do so, though.
> >> >> >
> >> >> > Gwen
> >> >> >
> >> >> >
> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> >> >> > <ot...@gmail.com> wrote:
> >> >> > > Hi,
> >> >> > >
> >> >> > > Is the plan for New Producer to have ONLY async mode?  I'm asking
> >> >> because
> >> >> > > of this info from the Wiki:
> >> >> > >
> >> >> > >
> >> >> > >    - The producer will always attempt to batch data and will
> always
> >> >> > >    immediately return a SendResponse which acts as a Future to
> allow
> >> >> the
> >> >> > >    client to await the completion of the request.
> >> >> > >
> >> >> > >
> >> >> > > The word "always" makes me think there will be no sync mode.
> >> >> > >
> >> >> > > Thanks,
> >> >> > > Otis
> >> >> > > --
> >> >> > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> >> Management
> >> >> > > Solr & Elasticsearch Support * http://sematext.com/
> >> >> >
> >> >>
> >>
>

Re: New Producer - ONLY sync mode?

Posted by Steve Morin <st...@stevemorin.com>.
Jay,
Thanks I'll look at that more closely.

On Sat, Feb 7, 2015 at 1:23 PM, Jay Kreps <ja...@gmail.com> wrote:

> Steve
>
> In terms of mimicing the sync behavior, I think that is what .get() does,
> no?
>
> We are always returning the offset and error information. The example I
> gave didn't make use of it, but you definitely can make use of it if you
> want to.
>
> -Jay
>
> On Wed, Feb 4, 2015 at 9:58 AM, Steve Morin <st...@stevemorin.com> wrote:
>
> > Looking at this thread I would ideally want something at least the right
> > recipe to mimic sync behavior like Otis is talking about.
> >
> > In the second case, would like to be able to individually know if
> messages
> > have failed even regardless if they are in separate batches, sort of like
> > what Kinesis does as Pradeep mentioned.
> > -Steve
> >
> > On Wed, Feb 4, 2015 at 11:19 AM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Yeah totally. Using a callback is, of course, the Right Thing for this
> > kind
> > > of stuff. But I have found that kind of asynchronous thinking can be
> hard
> > > for people. Even if you get out of the pre-java 8 syntactic pain that
> > > anonymous inner classes inflict just dealing with multiple threads of
> > > control without creating async spaghetti can be a challenge for complex
> > > stuff. That is really the only reason for the futures in the api, they
> > are
> > > strictly less powerful than the callbacks, but at least using them you
> > can
> > > just call .get() and pretend it is blocking.
> > >
> > > -Jay
> > >
> > > On Wed, Feb 4, 2015 at 7:19 AM, Joe Stein <jo...@stealth.ly>
> wrote:
> > >
> > > > Now that 0.8.2.0 is in the wild I look forward to working with more
> and
> > > > seeing what folks start to-do with this function
> > > >
> > > >
> > >
> >
> https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord
> > > > ,
> > > > org.apache.kafka.clients.producer.Callback) and keeping it fully non
> > > > blocking.
> > > >
> > > > One sprint I know of coming up is going to have the new producer as a
> > > > component in their reactive calls and handling bookkeeping and
> retries
> > > > through that type of call back approach. Should work well (haven't
> > tried
> > > > but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc,
> > etc
> > > in
> > > > functional languages and frameworks.
> > > >
> > > > I think as JDK 8 starts to get out in the wild too more (may after
> jdk7
> > > > eol) the use of .get will be reduced (imho) and folks will be
> thinking
> > > more
> > > > about non-blocking vs blocking and not as so much sync vs async but
> my
> > > > crystal ball just back from the shop so well see =8^)
> > > >
> > > > /*******************************************
> > > >  Joe Stein
> > > >  Founder, Principal Consultant
> > > >  Big Data Open Source Security LLC
> > > >  http://www.stealth.ly
> > > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > > ********************************************/
> > > >
> > > > On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > > >
> > > > > Hey guys,
> > > > >
> > > > > I guess the question is whether it really matters how many
> underlying
> > > > > network requests occur? It is very hard for an application to
> depend
> > on
> > > > > this even in the old producer since it depends on the partitions
> > > > placement
> > > > > (a send to two partitions may go to either one machine or two and
> so
> > it
> > > > > will send either one or two requests). So when you send a batch in
> > one
> > > > call
> > > > > you may feel that is "all at once", but that is only actually
> > > guaranteed
> > > > if
> > > > > all messages have the same partition.
> > > > >
> > > > > The challenge is allowing even this in the presence of bounded
> > request
> > > > > sizes which we have in the new producer. The user sends a list of
> > > objects
> > > > > and the serialized size that will result is not very apparent to
> > them.
> > > If
> > > > > you break it up into multiple requests then that is kind of further
> > > > ruining
> > > > > the illusion of a single send. If you don't then you have to just
> > error
> > > > out
> > > > > which is equally annoying to have to handle.
> > > > >
> > > > > But I'm not sure if from your description you are saying you
> actually
> > > > care
> > > > > how many physical requests are issued. I think it is more like it
> is
> > > just
> > > > > syntactically annoying to send a batch of data now because it
> needs a
> > > for
> > > > > loop.
> > > > >
> > > > > Currently to do this you would do:
> > > > >
> > > > > List responses = new ArrayList();
> > > > > for(input: recordBatch)
> > > > >     responses.add(producer.send(input));
> > > > > for(response: responses)
> > > > >     response.get
> > > > >
> > > > > If you don't depend on the offset/error info we could add a flush
> > call
> > > so
> > > > > you could instead do
> > > > > for(input: recordBatch)
> > > > >     producer.send(input);
> > > > > producer.flush();
> > > > >
> > > > > But if you do want the error/offset then you are going to be back
> to
> > > the
> > > > > original case.
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira <
> gshapira@cloudera.com>
> > > > > wrote:
> > > > >
> > > > > > I've been thinking about that too, since both Flume and Sqoop
> rely
> > on
> > > > > > send(List) API of the old API.
> > > > > >
> > > > > > I'd like to see this API come back, but I'm debating how we'd
> > handle
> > > > > > errors. IIRC, the old API would fail an entire batch on a single
> > > > > > error, which can lead to duplicates. Having N callbacks lets me
> > retry
> > > > > > / save / whatever just the messages that had issues.
> > > > > >
> > > > > > If messages had identifiers from the producer side, we could have
> > the
> > > > > > API call the callback with a list of message-ids and their
> status.
> > > But
> > > > > > they don't :)
> > > > > >
> > > > > > Any thoughts on how you'd like it to work?
> > > > > >
> > > > > > Gwen
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota <
> > > > pradeepg26@gmail.com>
> > > > > > wrote:
> > > > > > > This is a great question Otis. Like Gwen said, you can
> accomplish
> > > > Sync
> > > > > > mode
> > > > > > > by setting the batch size to 1. But this does highlight a
> > > shortcoming
> > > > > of
> > > > > > > the new producer API.
> > > > > > >
> > > > > > > I really like the design of the new API and it has really great
> > > > > > properties
> > > > > > > and I'm enjoying working with it. However, once API that I
> think
> > > > we're
> > > > > > > lacking is a "batch" API. Currently, I have to iterate over a
> > batch
> > > > and
> > > > > > > call .send() on each record, which returns n callbacks instead
> > of 1
> > > > > > > callback for the whole batch. This significantly complicates
> > > recovery
> > > > > > logic
> > > > > > > where we need to commit a batch as opposed 1 record at a time.
> > > > > > >
> > > > > > > Do you guys have any plans to add better semantics around
> > batches?
> > > > > > >
> > > > > > > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <
> > > gshapira@cloudera.com>
> > > > > > wrote:
> > > > > > >
> > > > > > >> If I understood the code and Jay correctly - if you wait for
> the
> > > > > > >> future it will be a similar delay to that of the old sync
> > > producer.
> > > > > > >>
> > > > > > >> Put another way, if you test it out and see longer delays than
> > the
> > > > > > >> sync producer had, we need to find out why and fix it.
> > > > > > >>
> > > > > > >> Gwen
> > > > > > >>
> > > > > > >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> > > > > > >> <ot...@gmail.com> wrote:
> > > > > > >> > Hi,
> > > > > > >> >
> > > > > > >> > Nope, unfortunately it can't do that.  X is a remote app,
> > > doesn't
> > > > > > listen
> > > > > > >> to
> > > > > > >> > anything external, calls Y via HTTPS.  So X has to decide
> what
> > > to
> > > > do
> > > > > > with
> > > > > > >> > its data based on Y's synchronous response.  It has to block
> > > > until Y
> > > > > > >> > responds.  And it wouldn't be pretty, I think, because
> nobody
> > > > wants
> > > > > to
> > > > > > >> run
> > > > > > >> > apps that talk to remove servers and hang on to connections
> > more
> > > > > than
> > > > > > >> they
> > > > > > >> > have to.  But perhaps that is the only way?  Or maybe the
> > answer
> > > > to
> > > > > > "I'm
> > > > > > >> > guessing the delay would be more or less the same as if the
> > > > Producer
> > > > > > was
> > > > > > >> > using SYNC mode?" is YES, in which case the connection from
> X
> > > to Y
> > > > > > would
> > > > > > >> be
> > > > > > >> > open for just as long as with a SYNC producer running in Y?
> > > > > > >> >
> > > > > > >> > Thanks,
> > > > > > >> > Otis
> > > > > > >> > --
> > > > > > >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > > > > Management
> > > > > > >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <
> > > > gshapira@cloudera.com
> > > > > >
> > > > > > >> wrote:
> > > > > > >> >
> > > > > > >> >> Can Y have a callback that will handle the notification to
> X?
> > > > > > >> >> In this case, perhaps Y can be async and X can buffer the
> > data
> > > > > until
> > > > > > >> >> the callback triggers and says "all good" (or resend if the
> > > > > callback
> > > > > > >> >> indicates an error)
> > > > > > >> >>
> > > > > > >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> > > > > > >> >> <ot...@gmail.com> wrote:
> > > > > > >> >> > Hi,
> > > > > > >> >> >
> > > > > > >> >> > Thanks for the info.  Here's the use case.  We have
> > something
> > > > up
> > > > > > >> stream
> > > > > > >> >> > sending data, say a log shipper called X.  It sends it to
> > > some
> > > > > > remote
> > > > > > >> >> > component Y.  Y is the Kafka Producer and it puts data
> into
> > > > > Kafka.
> > > > > > >> But Y
> > > > > > >> >> > needs to send a reply to X and tell it whether it
> > > successfully
> > > > > put
> > > > > > all
> > > > > > >> >> its
> > > > > > >> >> > data into Kafka.  If it did not, Y wants to tell X to
> > buffer
> > > > data
> > > > > > >> locally
> > > > > > >> >> > and resend it later.
> > > > > > >> >> >
> > > > > > >> >> > If producer is ONLY async, Y can't easily do that.  Or
> > maybe
> > > Y
> > > > > > would
> > > > > > >> just
> > > > > > >> >> > need to wait for the Future to come back and only then
> send
> > > the
> > > > > > >> response
> > > > > > >> >> > back to X?  If so, I'm guessing the delay would be more
> or
> > > less
> > > > > the
> > > > > > >> same
> > > > > > >> >> as
> > > > > > >> >> > if the Producer was using SYNC mode?
> > > > > > >> >> >
> > > > > > >> >> > Thanks,
> > > > > > >> >> > Otis
> > > > > > >> >> > --
> > > > > > >> >> > Monitoring * Alerting * Anomaly Detection * Centralized
> Log
> > > > > > Management
> > > > > > >> >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > > > >> >> >
> > > > > > >> >> >
> > > > > > >> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <
> > > jay.kreps@gmail.com
> > > > >
> > > > > > >> wrote:
> > > > > > >> >> >
> > > > > > >> >> >> Yeah as Gwen says there is no sync/async mode anymore.
> > There
> > > > is
> > > > > a
> > > > > > new
> > > > > > >> >> >> configuration which does a lot of what async did in
> terms
> > of
> > > > > > allowing
> > > > > > >> >> >> batching:
> > > > > > >> >> >>
> > > > > > >> >> >> batch.size - This is the target amount of data per
> > partition
> > > > the
> > > > > > >> server
> > > > > > >> >> >> will attempt to batch together.
> > > > > > >> >> >> linger.ms - This is the time the producer will wait for
> > > more
> > > > > data
> > > > > > >> to be
> > > > > > >> >> >> sent to better batch up writes. The default is 0 (send
> > > > > > immediately).
> > > > > > >> So
> > > > > > >> >> if
> > > > > > >> >> >> you set this to 50 ms the client will send immediately
> if
> > it
> > > > has
> > > > > > >> already
> > > > > > >> >> >> filled up its batch, otherwise it will wait to
> accumulate
> > > the
> > > > > > number
> > > > > > >> of
> > > > > > >> >> >> bytes given by batch.size.
> > > > > > >> >> >>
> > > > > > >> >> >> To send asynchronously you do
> > > > > > >> >> >>    producer.send(record)
> > > > > > >> >> >> whereas to block on a response you do
> > > > > > >> >> >>    producer.send(record).get();
> > > > > > >> >> >> which will wait for acknowledgement from the server.
> > > > > > >> >> >>
> > > > > > >> >> >> One advantage of this model is that the client will do
> > it's
> > > > best
> > > > > > to
> > > > > > >> >> batch
> > > > > > >> >> >> under the covers even if linger.ms=0. It will do this
> by
> > > > > batching
> > > > > > >> any
> > > > > > >> >> data
> > > > > > >> >> >> that arrives while another send is in progress into a
> > single
> > > > > > >> >> >> request--giving a kind of "group commit" effect.
> > > > > > >> >> >>
> > > > > > >> >> >> The hope is that this will be both simpler to understand
> > (a
> > > > > single
> > > > > > >> api
> > > > > > >> >> that
> > > > > > >> >> >> always works the same) and more powerful (you always
> get a
> > > > > > response
> > > > > > >> with
> > > > > > >> >> >> error and offset information whether or not you choose
> to
> > > use
> > > > > it).
> > > > > > >> >> >>
> > > > > > >> >> >> -Jay
> > > > > > >> >> >>
> > > > > > >> >> >>
> > > > > > >> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <
> > > > > > gshapira@cloudera.com
> > > > > > >> >
> > > > > > >> >> >> wrote:
> > > > > > >> >> >>
> > > > > > >> >> >> > If you want to emulate the old sync producer behavior,
> > you
> > > > > need
> > > > > > to
> > > > > > >> set
> > > > > > >> >> >> > the batch size to 1  (in producer config) and wait on
> > the
> > > > > future
> > > > > > >> you
> > > > > > >> >> >> > get from Send (i.e. future.get)
> > > > > > >> >> >> >
> > > > > > >> >> >> > I can't think of good reasons to do so, though.
> > > > > > >> >> >> >
> > > > > > >> >> >> > Gwen
> > > > > > >> >> >> >
> > > > > > >> >> >> >
> > > > > > >> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> > > > > > >> >> >> > <ot...@gmail.com> wrote:
> > > > > > >> >> >> > > Hi,
> > > > > > >> >> >> > >
> > > > > > >> >> >> > > Is the plan for New Producer to have ONLY async
> mode?
> > > I'm
> > > > > > asking
> > > > > > >> >> >> because
> > > > > > >> >> >> > > of this info from the Wiki:
> > > > > > >> >> >> > >
> > > > > > >> >> >> > >
> > > > > > >> >> >> > >    - The producer will always attempt to batch data
> > and
> > > > will
> > > > > > >> always
> > > > > > >> >> >> > >    immediately return a SendResponse which acts as a
> > > > Future
> > > > > to
> > > > > > >> allow
> > > > > > >> >> >> the
> > > > > > >> >> >> > >    client to await the completion of the request.
> > > > > > >> >> >> > >
> > > > > > >> >> >> > >
> > > > > > >> >> >> > > The word "always" makes me think there will be no
> sync
> > > > mode.
> > > > > > >> >> >> > >
> > > > > > >> >> >> > > Thanks,
> > > > > > >> >> >> > > Otis
> > > > > > >> >> >> > > --
> > > > > > >> >> >> > > Monitoring * Alerting * Anomaly Detection *
> > Centralized
> > > > Log
> > > > > > >> >> Management
> > > > > > >> >> >> > > Solr & Elasticsearch Support * http://sematext.com/
> > > > > > >> >> >> >
> > > > > > >> >> >>
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: New Producer - ONLY sync mode?

Posted by Jay Kreps <ja...@gmail.com>.
Steve

In terms of mimicing the sync behavior, I think that is what .get() does,
no?

We are always returning the offset and error information. The example I
gave didn't make use of it, but you definitely can make use of it if you
want to.

-Jay

On Wed, Feb 4, 2015 at 9:58 AM, Steve Morin <st...@stevemorin.com> wrote:

> Looking at this thread I would ideally want something at least the right
> recipe to mimic sync behavior like Otis is talking about.
>
> In the second case, would like to be able to individually know if messages
> have failed even regardless if they are in separate batches, sort of like
> what Kinesis does as Pradeep mentioned.
> -Steve
>
> On Wed, Feb 4, 2015 at 11:19 AM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Yeah totally. Using a callback is, of course, the Right Thing for this
> kind
> > of stuff. But I have found that kind of asynchronous thinking can be hard
> > for people. Even if you get out of the pre-java 8 syntactic pain that
> > anonymous inner classes inflict just dealing with multiple threads of
> > control without creating async spaghetti can be a challenge for complex
> > stuff. That is really the only reason for the futures in the api, they
> are
> > strictly less powerful than the callbacks, but at least using them you
> can
> > just call .get() and pretend it is blocking.
> >
> > -Jay
> >
> > On Wed, Feb 4, 2015 at 7:19 AM, Joe Stein <jo...@stealth.ly> wrote:
> >
> > > Now that 0.8.2.0 is in the wild I look forward to working with more and
> > > seeing what folks start to-do with this function
> > >
> > >
> >
> https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord
> > > ,
> > > org.apache.kafka.clients.producer.Callback) and keeping it fully non
> > > blocking.
> > >
> > > One sprint I know of coming up is going to have the new producer as a
> > > component in their reactive calls and handling bookkeeping and retries
> > > through that type of call back approach. Should work well (haven't
> tried
> > > but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc,
> etc
> > in
> > > functional languages and frameworks.
> > >
> > > I think as JDK 8 starts to get out in the wild too more (may after jdk7
> > > eol) the use of .get will be reduced (imho) and folks will be thinking
> > more
> > > about non-blocking vs blocking and not as so much sync vs async but my
> > > crystal ball just back from the shop so well see =8^)
> > >
> > > /*******************************************
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > ********************************************/
> > >
> > > On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >
> > > > Hey guys,
> > > >
> > > > I guess the question is whether it really matters how many underlying
> > > > network requests occur? It is very hard for an application to depend
> on
> > > > this even in the old producer since it depends on the partitions
> > > placement
> > > > (a send to two partitions may go to either one machine or two and so
> it
> > > > will send either one or two requests). So when you send a batch in
> one
> > > call
> > > > you may feel that is "all at once", but that is only actually
> > guaranteed
> > > if
> > > > all messages have the same partition.
> > > >
> > > > The challenge is allowing even this in the presence of bounded
> request
> > > > sizes which we have in the new producer. The user sends a list of
> > objects
> > > > and the serialized size that will result is not very apparent to
> them.
> > If
> > > > you break it up into multiple requests then that is kind of further
> > > ruining
> > > > the illusion of a single send. If you don't then you have to just
> error
> > > out
> > > > which is equally annoying to have to handle.
> > > >
> > > > But I'm not sure if from your description you are saying you actually
> > > care
> > > > how many physical requests are issued. I think it is more like it is
> > just
> > > > syntactically annoying to send a batch of data now because it needs a
> > for
> > > > loop.
> > > >
> > > > Currently to do this you would do:
> > > >
> > > > List responses = new ArrayList();
> > > > for(input: recordBatch)
> > > >     responses.add(producer.send(input));
> > > > for(response: responses)
> > > >     response.get
> > > >
> > > > If you don't depend on the offset/error info we could add a flush
> call
> > so
> > > > you could instead do
> > > > for(input: recordBatch)
> > > >     producer.send(input);
> > > > producer.flush();
> > > >
> > > > But if you do want the error/offset then you are going to be back to
> > the
> > > > original case.
> > > >
> > > > Thoughts?
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira <gs...@cloudera.com>
> > > > wrote:
> > > >
> > > > > I've been thinking about that too, since both Flume and Sqoop rely
> on
> > > > > send(List) API of the old API.
> > > > >
> > > > > I'd like to see this API come back, but I'm debating how we'd
> handle
> > > > > errors. IIRC, the old API would fail an entire batch on a single
> > > > > error, which can lead to duplicates. Having N callbacks lets me
> retry
> > > > > / save / whatever just the messages that had issues.
> > > > >
> > > > > If messages had identifiers from the producer side, we could have
> the
> > > > > API call the callback with a list of message-ids and their status.
> > But
> > > > > they don't :)
> > > > >
> > > > > Any thoughts on how you'd like it to work?
> > > > >
> > > > > Gwen
> > > > >
> > > > >
> > > > > On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota <
> > > pradeepg26@gmail.com>
> > > > > wrote:
> > > > > > This is a great question Otis. Like Gwen said, you can accomplish
> > > Sync
> > > > > mode
> > > > > > by setting the batch size to 1. But this does highlight a
> > shortcoming
> > > > of
> > > > > > the new producer API.
> > > > > >
> > > > > > I really like the design of the new API and it has really great
> > > > > properties
> > > > > > and I'm enjoying working with it. However, once API that I think
> > > we're
> > > > > > lacking is a "batch" API. Currently, I have to iterate over a
> batch
> > > and
> > > > > > call .send() on each record, which returns n callbacks instead
> of 1
> > > > > > callback for the whole batch. This significantly complicates
> > recovery
> > > > > logic
> > > > > > where we need to commit a batch as opposed 1 record at a time.
> > > > > >
> > > > > > Do you guys have any plans to add better semantics around
> batches?
> > > > > >
> > > > > > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <
> > gshapira@cloudera.com>
> > > > > wrote:
> > > > > >
> > > > > >> If I understood the code and Jay correctly - if you wait for the
> > > > > >> future it will be a similar delay to that of the old sync
> > producer.
> > > > > >>
> > > > > >> Put another way, if you test it out and see longer delays than
> the
> > > > > >> sync producer had, we need to find out why and fix it.
> > > > > >>
> > > > > >> Gwen
> > > > > >>
> > > > > >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> > > > > >> <ot...@gmail.com> wrote:
> > > > > >> > Hi,
> > > > > >> >
> > > > > >> > Nope, unfortunately it can't do that.  X is a remote app,
> > doesn't
> > > > > listen
> > > > > >> to
> > > > > >> > anything external, calls Y via HTTPS.  So X has to decide what
> > to
> > > do
> > > > > with
> > > > > >> > its data based on Y's synchronous response.  It has to block
> > > until Y
> > > > > >> > responds.  And it wouldn't be pretty, I think, because nobody
> > > wants
> > > > to
> > > > > >> run
> > > > > >> > apps that talk to remove servers and hang on to connections
> more
> > > > than
> > > > > >> they
> > > > > >> > have to.  But perhaps that is the only way?  Or maybe the
> answer
> > > to
> > > > > "I'm
> > > > > >> > guessing the delay would be more or less the same as if the
> > > Producer
> > > > > was
> > > > > >> > using SYNC mode?" is YES, in which case the connection from X
> > to Y
> > > > > would
> > > > > >> be
> > > > > >> > open for just as long as with a SYNC producer running in Y?
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> > Otis
> > > > > >> > --
> > > > > >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > > > Management
> > > > > >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > > >> >
> > > > > >> >
> > > > > >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <
> > > gshapira@cloudera.com
> > > > >
> > > > > >> wrote:
> > > > > >> >
> > > > > >> >> Can Y have a callback that will handle the notification to X?
> > > > > >> >> In this case, perhaps Y can be async and X can buffer the
> data
> > > > until
> > > > > >> >> the callback triggers and says "all good" (or resend if the
> > > > callback
> > > > > >> >> indicates an error)
> > > > > >> >>
> > > > > >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> > > > > >> >> <ot...@gmail.com> wrote:
> > > > > >> >> > Hi,
> > > > > >> >> >
> > > > > >> >> > Thanks for the info.  Here's the use case.  We have
> something
> > > up
> > > > > >> stream
> > > > > >> >> > sending data, say a log shipper called X.  It sends it to
> > some
> > > > > remote
> > > > > >> >> > component Y.  Y is the Kafka Producer and it puts data into
> > > > Kafka.
> > > > > >> But Y
> > > > > >> >> > needs to send a reply to X and tell it whether it
> > successfully
> > > > put
> > > > > all
> > > > > >> >> its
> > > > > >> >> > data into Kafka.  If it did not, Y wants to tell X to
> buffer
> > > data
> > > > > >> locally
> > > > > >> >> > and resend it later.
> > > > > >> >> >
> > > > > >> >> > If producer is ONLY async, Y can't easily do that.  Or
> maybe
> > Y
> > > > > would
> > > > > >> just
> > > > > >> >> > need to wait for the Future to come back and only then send
> > the
> > > > > >> response
> > > > > >> >> > back to X?  If so, I'm guessing the delay would be more or
> > less
> > > > the
> > > > > >> same
> > > > > >> >> as
> > > > > >> >> > if the Producer was using SYNC mode?
> > > > > >> >> >
> > > > > >> >> > Thanks,
> > > > > >> >> > Otis
> > > > > >> >> > --
> > > > > >> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > > > > Management
> > > > > >> >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <
> > jay.kreps@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >> >> >
> > > > > >> >> >> Yeah as Gwen says there is no sync/async mode anymore.
> There
> > > is
> > > > a
> > > > > new
> > > > > >> >> >> configuration which does a lot of what async did in terms
> of
> > > > > allowing
> > > > > >> >> >> batching:
> > > > > >> >> >>
> > > > > >> >> >> batch.size - This is the target amount of data per
> partition
> > > the
> > > > > >> server
> > > > > >> >> >> will attempt to batch together.
> > > > > >> >> >> linger.ms - This is the time the producer will wait for
> > more
> > > > data
> > > > > >> to be
> > > > > >> >> >> sent to better batch up writes. The default is 0 (send
> > > > > immediately).
> > > > > >> So
> > > > > >> >> if
> > > > > >> >> >> you set this to 50 ms the client will send immediately if
> it
> > > has
> > > > > >> already
> > > > > >> >> >> filled up its batch, otherwise it will wait to accumulate
> > the
> > > > > number
> > > > > >> of
> > > > > >> >> >> bytes given by batch.size.
> > > > > >> >> >>
> > > > > >> >> >> To send asynchronously you do
> > > > > >> >> >>    producer.send(record)
> > > > > >> >> >> whereas to block on a response you do
> > > > > >> >> >>    producer.send(record).get();
> > > > > >> >> >> which will wait for acknowledgement from the server.
> > > > > >> >> >>
> > > > > >> >> >> One advantage of this model is that the client will do
> it's
> > > best
> > > > > to
> > > > > >> >> batch
> > > > > >> >> >> under the covers even if linger.ms=0. It will do this by
> > > > batching
> > > > > >> any
> > > > > >> >> data
> > > > > >> >> >> that arrives while another send is in progress into a
> single
> > > > > >> >> >> request--giving a kind of "group commit" effect.
> > > > > >> >> >>
> > > > > >> >> >> The hope is that this will be both simpler to understand
> (a
> > > > single
> > > > > >> api
> > > > > >> >> that
> > > > > >> >> >> always works the same) and more powerful (you always get a
> > > > > response
> > > > > >> with
> > > > > >> >> >> error and offset information whether or not you choose to
> > use
> > > > it).
> > > > > >> >> >>
> > > > > >> >> >> -Jay
> > > > > >> >> >>
> > > > > >> >> >>
> > > > > >> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <
> > > > > gshapira@cloudera.com
> > > > > >> >
> > > > > >> >> >> wrote:
> > > > > >> >> >>
> > > > > >> >> >> > If you want to emulate the old sync producer behavior,
> you
> > > > need
> > > > > to
> > > > > >> set
> > > > > >> >> >> > the batch size to 1  (in producer config) and wait on
> the
> > > > future
> > > > > >> you
> > > > > >> >> >> > get from Send (i.e. future.get)
> > > > > >> >> >> >
> > > > > >> >> >> > I can't think of good reasons to do so, though.
> > > > > >> >> >> >
> > > > > >> >> >> > Gwen
> > > > > >> >> >> >
> > > > > >> >> >> >
> > > > > >> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> > > > > >> >> >> > <ot...@gmail.com> wrote:
> > > > > >> >> >> > > Hi,
> > > > > >> >> >> > >
> > > > > >> >> >> > > Is the plan for New Producer to have ONLY async mode?
> > I'm
> > > > > asking
> > > > > >> >> >> because
> > > > > >> >> >> > > of this info from the Wiki:
> > > > > >> >> >> > >
> > > > > >> >> >> > >
> > > > > >> >> >> > >    - The producer will always attempt to batch data
> and
> > > will
> > > > > >> always
> > > > > >> >> >> > >    immediately return a SendResponse which acts as a
> > > Future
> > > > to
> > > > > >> allow
> > > > > >> >> >> the
> > > > > >> >> >> > >    client to await the completion of the request.
> > > > > >> >> >> > >
> > > > > >> >> >> > >
> > > > > >> >> >> > > The word "always" makes me think there will be no sync
> > > mode.
> > > > > >> >> >> > >
> > > > > >> >> >> > > Thanks,
> > > > > >> >> >> > > Otis
> > > > > >> >> >> > > --
> > > > > >> >> >> > > Monitoring * Alerting * Anomaly Detection *
> Centralized
> > > Log
> > > > > >> >> Management
> > > > > >> >> >> > > Solr & Elasticsearch Support * http://sematext.com/
> > > > > >> >> >> >
> > > > > >> >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Re: New Producer - ONLY sync mode?

Posted by Steven Wu <st...@gmail.com>.
this is the jira regarding blocking on metadata.
https://issues.apache.org/jira/browse/KAFKA-1835

I am less concerned about the first-time blocking. I am more concerned
about the situation when kafka cluster/brokers are completely down. now we
can screw up the producer apps. I hope that we can take care of the last
mile and make it truly/fully non-blocking :)




On Wed, Feb 4, 2015 at 9:58 AM, Steve Morin <st...@stevemorin.com> wrote:

> Looking at this thread I would ideally want something at least the right
> recipe to mimic sync behavior like Otis is talking about.
>
> In the second case, would like to be able to individually know if messages
> have failed even regardless if they are in separate batches, sort of like
> what Kinesis does as Pradeep mentioned.
> -Steve
>
> On Wed, Feb 4, 2015 at 11:19 AM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Yeah totally. Using a callback is, of course, the Right Thing for this
> kind
> > of stuff. But I have found that kind of asynchronous thinking can be hard
> > for people. Even if you get out of the pre-java 8 syntactic pain that
> > anonymous inner classes inflict just dealing with multiple threads of
> > control without creating async spaghetti can be a challenge for complex
> > stuff. That is really the only reason for the futures in the api, they
> are
> > strictly less powerful than the callbacks, but at least using them you
> can
> > just call .get() and pretend it is blocking.
> >
> > -Jay
> >
> > On Wed, Feb 4, 2015 at 7:19 AM, Joe Stein <jo...@stealth.ly> wrote:
> >
> > > Now that 0.8.2.0 is in the wild I look forward to working with more and
> > > seeing what folks start to-do with this function
> > >
> > >
> >
> https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord
> > > ,
> > > org.apache.kafka.clients.producer.Callback) and keeping it fully non
> > > blocking.
> > >
> > > One sprint I know of coming up is going to have the new producer as a
> > > component in their reactive calls and handling bookkeeping and retries
> > > through that type of call back approach. Should work well (haven't
> tried
> > > but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc,
> etc
> > in
> > > functional languages and frameworks.
> > >
> > > I think as JDK 8 starts to get out in the wild too more (may after jdk7
> > > eol) the use of .get will be reduced (imho) and folks will be thinking
> > more
> > > about non-blocking vs blocking and not as so much sync vs async but my
> > > crystal ball just back from the shop so well see =8^)
> > >
> > > /*******************************************
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > ********************************************/
> > >
> > > On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >
> > > > Hey guys,
> > > >
> > > > I guess the question is whether it really matters how many underlying
> > > > network requests occur? It is very hard for an application to depend
> on
> > > > this even in the old producer since it depends on the partitions
> > > placement
> > > > (a send to two partitions may go to either one machine or two and so
> it
> > > > will send either one or two requests). So when you send a batch in
> one
> > > call
> > > > you may feel that is "all at once", but that is only actually
> > guaranteed
> > > if
> > > > all messages have the same partition.
> > > >
> > > > The challenge is allowing even this in the presence of bounded
> request
> > > > sizes which we have in the new producer. The user sends a list of
> > objects
> > > > and the serialized size that will result is not very apparent to
> them.
> > If
> > > > you break it up into multiple requests then that is kind of further
> > > ruining
> > > > the illusion of a single send. If you don't then you have to just
> error
> > > out
> > > > which is equally annoying to have to handle.
> > > >
> > > > But I'm not sure if from your description you are saying you actually
> > > care
> > > > how many physical requests are issued. I think it is more like it is
> > just
> > > > syntactically annoying to send a batch of data now because it needs a
> > for
> > > > loop.
> > > >
> > > > Currently to do this you would do:
> > > >
> > > > List responses = new ArrayList();
> > > > for(input: recordBatch)
> > > >     responses.add(producer.send(input));
> > > > for(response: responses)
> > > >     response.get
> > > >
> > > > If you don't depend on the offset/error info we could add a flush
> call
> > so
> > > > you could instead do
> > > > for(input: recordBatch)
> > > >     producer.send(input);
> > > > producer.flush();
> > > >
> > > > But if you do want the error/offset then you are going to be back to
> > the
> > > > original case.
> > > >
> > > > Thoughts?
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira <gs...@cloudera.com>
> > > > wrote:
> > > >
> > > > > I've been thinking about that too, since both Flume and Sqoop rely
> on
> > > > > send(List) API of the old API.
> > > > >
> > > > > I'd like to see this API come back, but I'm debating how we'd
> handle
> > > > > errors. IIRC, the old API would fail an entire batch on a single
> > > > > error, which can lead to duplicates. Having N callbacks lets me
> retry
> > > > > / save / whatever just the messages that had issues.
> > > > >
> > > > > If messages had identifiers from the producer side, we could have
> the
> > > > > API call the callback with a list of message-ids and their status.
> > But
> > > > > they don't :)
> > > > >
> > > > > Any thoughts on how you'd like it to work?
> > > > >
> > > > > Gwen
> > > > >
> > > > >
> > > > > On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota <
> > > pradeepg26@gmail.com>
> > > > > wrote:
> > > > > > This is a great question Otis. Like Gwen said, you can accomplish
> > > Sync
> > > > > mode
> > > > > > by setting the batch size to 1. But this does highlight a
> > shortcoming
> > > > of
> > > > > > the new producer API.
> > > > > >
> > > > > > I really like the design of the new API and it has really great
> > > > > properties
> > > > > > and I'm enjoying working with it. However, once API that I think
> > > we're
> > > > > > lacking is a "batch" API. Currently, I have to iterate over a
> batch
> > > and
> > > > > > call .send() on each record, which returns n callbacks instead
> of 1
> > > > > > callback for the whole batch. This significantly complicates
> > recovery
> > > > > logic
> > > > > > where we need to commit a batch as opposed 1 record at a time.
> > > > > >
> > > > > > Do you guys have any plans to add better semantics around
> batches?
> > > > > >
> > > > > > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <
> > gshapira@cloudera.com>
> > > > > wrote:
> > > > > >
> > > > > >> If I understood the code and Jay correctly - if you wait for the
> > > > > >> future it will be a similar delay to that of the old sync
> > producer.
> > > > > >>
> > > > > >> Put another way, if you test it out and see longer delays than
> the
> > > > > >> sync producer had, we need to find out why and fix it.
> > > > > >>
> > > > > >> Gwen
> > > > > >>
> > > > > >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> > > > > >> <ot...@gmail.com> wrote:
> > > > > >> > Hi,
> > > > > >> >
> > > > > >> > Nope, unfortunately it can't do that.  X is a remote app,
> > doesn't
> > > > > listen
> > > > > >> to
> > > > > >> > anything external, calls Y via HTTPS.  So X has to decide what
> > to
> > > do
> > > > > with
> > > > > >> > its data based on Y's synchronous response.  It has to block
> > > until Y
> > > > > >> > responds.  And it wouldn't be pretty, I think, because nobody
> > > wants
> > > > to
> > > > > >> run
> > > > > >> > apps that talk to remove servers and hang on to connections
> more
> > > > than
> > > > > >> they
> > > > > >> > have to.  But perhaps that is the only way?  Or maybe the
> answer
> > > to
> > > > > "I'm
> > > > > >> > guessing the delay would be more or less the same as if the
> > > Producer
> > > > > was
> > > > > >> > using SYNC mode?" is YES, in which case the connection from X
> > to Y
> > > > > would
> > > > > >> be
> > > > > >> > open for just as long as with a SYNC producer running in Y?
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> > Otis
> > > > > >> > --
> > > > > >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > > > Management
> > > > > >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > > >> >
> > > > > >> >
> > > > > >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <
> > > gshapira@cloudera.com
> > > > >
> > > > > >> wrote:
> > > > > >> >
> > > > > >> >> Can Y have a callback that will handle the notification to X?
> > > > > >> >> In this case, perhaps Y can be async and X can buffer the
> data
> > > > until
> > > > > >> >> the callback triggers and says "all good" (or resend if the
> > > > callback
> > > > > >> >> indicates an error)
> > > > > >> >>
> > > > > >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> > > > > >> >> <ot...@gmail.com> wrote:
> > > > > >> >> > Hi,
> > > > > >> >> >
> > > > > >> >> > Thanks for the info.  Here's the use case.  We have
> something
> > > up
> > > > > >> stream
> > > > > >> >> > sending data, say a log shipper called X.  It sends it to
> > some
> > > > > remote
> > > > > >> >> > component Y.  Y is the Kafka Producer and it puts data into
> > > > Kafka.
> > > > > >> But Y
> > > > > >> >> > needs to send a reply to X and tell it whether it
> > successfully
> > > > put
> > > > > all
> > > > > >> >> its
> > > > > >> >> > data into Kafka.  If it did not, Y wants to tell X to
> buffer
> > > data
> > > > > >> locally
> > > > > >> >> > and resend it later.
> > > > > >> >> >
> > > > > >> >> > If producer is ONLY async, Y can't easily do that.  Or
> maybe
> > Y
> > > > > would
> > > > > >> just
> > > > > >> >> > need to wait for the Future to come back and only then send
> > the
> > > > > >> response
> > > > > >> >> > back to X?  If so, I'm guessing the delay would be more or
> > less
> > > > the
> > > > > >> same
> > > > > >> >> as
> > > > > >> >> > if the Producer was using SYNC mode?
> > > > > >> >> >
> > > > > >> >> > Thanks,
> > > > > >> >> > Otis
> > > > > >> >> > --
> > > > > >> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > > > > Management
> > > > > >> >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <
> > jay.kreps@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >> >> >
> > > > > >> >> >> Yeah as Gwen says there is no sync/async mode anymore.
> There
> > > is
> > > > a
> > > > > new
> > > > > >> >> >> configuration which does a lot of what async did in terms
> of
> > > > > allowing
> > > > > >> >> >> batching:
> > > > > >> >> >>
> > > > > >> >> >> batch.size - This is the target amount of data per
> partition
> > > the
> > > > > >> server
> > > > > >> >> >> will attempt to batch together.
> > > > > >> >> >> linger.ms - This is the time the producer will wait for
> > more
> > > > data
> > > > > >> to be
> > > > > >> >> >> sent to better batch up writes. The default is 0 (send
> > > > > immediately).
> > > > > >> So
> > > > > >> >> if
> > > > > >> >> >> you set this to 50 ms the client will send immediately if
> it
> > > has
> > > > > >> already
> > > > > >> >> >> filled up its batch, otherwise it will wait to accumulate
> > the
> > > > > number
> > > > > >> of
> > > > > >> >> >> bytes given by batch.size.
> > > > > >> >> >>
> > > > > >> >> >> To send asynchronously you do
> > > > > >> >> >>    producer.send(record)
> > > > > >> >> >> whereas to block on a response you do
> > > > > >> >> >>    producer.send(record).get();
> > > > > >> >> >> which will wait for acknowledgement from the server.
> > > > > >> >> >>
> > > > > >> >> >> One advantage of this model is that the client will do
> it's
> > > best
> > > > > to
> > > > > >> >> batch
> > > > > >> >> >> under the covers even if linger.ms=0. It will do this by
> > > > batching
> > > > > >> any
> > > > > >> >> data
> > > > > >> >> >> that arrives while another send is in progress into a
> single
> > > > > >> >> >> request--giving a kind of "group commit" effect.
> > > > > >> >> >>
> > > > > >> >> >> The hope is that this will be both simpler to understand
> (a
> > > > single
> > > > > >> api
> > > > > >> >> that
> > > > > >> >> >> always works the same) and more powerful (you always get a
> > > > > response
> > > > > >> with
> > > > > >> >> >> error and offset information whether or not you choose to
> > use
> > > > it).
> > > > > >> >> >>
> > > > > >> >> >> -Jay
> > > > > >> >> >>
> > > > > >> >> >>
> > > > > >> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <
> > > > > gshapira@cloudera.com
> > > > > >> >
> > > > > >> >> >> wrote:
> > > > > >> >> >>
> > > > > >> >> >> > If you want to emulate the old sync producer behavior,
> you
> > > > need
> > > > > to
> > > > > >> set
> > > > > >> >> >> > the batch size to 1  (in producer config) and wait on
> the
> > > > future
> > > > > >> you
> > > > > >> >> >> > get from Send (i.e. future.get)
> > > > > >> >> >> >
> > > > > >> >> >> > I can't think of good reasons to do so, though.
> > > > > >> >> >> >
> > > > > >> >> >> > Gwen
> > > > > >> >> >> >
> > > > > >> >> >> >
> > > > > >> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> > > > > >> >> >> > <ot...@gmail.com> wrote:
> > > > > >> >> >> > > Hi,
> > > > > >> >> >> > >
> > > > > >> >> >> > > Is the plan for New Producer to have ONLY async mode?
> > I'm
> > > > > asking
> > > > > >> >> >> because
> > > > > >> >> >> > > of this info from the Wiki:
> > > > > >> >> >> > >
> > > > > >> >> >> > >
> > > > > >> >> >> > >    - The producer will always attempt to batch data
> and
> > > will
> > > > > >> always
> > > > > >> >> >> > >    immediately return a SendResponse which acts as a
> > > Future
> > > > to
> > > > > >> allow
> > > > > >> >> >> the
> > > > > >> >> >> > >    client to await the completion of the request.
> > > > > >> >> >> > >
> > > > > >> >> >> > >
> > > > > >> >> >> > > The word "always" makes me think there will be no sync
> > > mode.
> > > > > >> >> >> > >
> > > > > >> >> >> > > Thanks,
> > > > > >> >> >> > > Otis
> > > > > >> >> >> > > --
> > > > > >> >> >> > > Monitoring * Alerting * Anomaly Detection *
> Centralized
> > > Log
> > > > > >> >> Management
> > > > > >> >> >> > > Solr & Elasticsearch Support * http://sematext.com/
> > > > > >> >> >> >
> > > > > >> >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Re: New Producer - ONLY sync mode?

Posted by Gwen Shapira <gs...@cloudera.com>.
I thought Jay Kreps had the right recipes:

To mimic the old Sync producer:
 producer.send(record).get();

To mimic old batches:
List responses = new ArrayList();
for(input: recordBatch)
    responses.add(producer.send(input));
for(response: responses)
    response.get

Perhaps we need to add this to the FAQ?

Gwen


On Wed, Feb 4, 2015 at 9:58 AM, Steve Morin <st...@stevemorin.com> wrote:

> Looking at this thread I would ideally want something at least the right
> recipe to mimic sync behavior like Otis is talking about.
>
> In the second case, would like to be able to individually know if messages
> have failed even regardless if they are in separate batches, sort of like
> what Kinesis does as Pradeep mentioned.
> -Steve
>
> On Wed, Feb 4, 2015 at 11:19 AM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Yeah totally. Using a callback is, of course, the Right Thing for this
> kind
> > of stuff. But I have found that kind of asynchronous thinking can be hard
> > for people. Even if you get out of the pre-java 8 syntactic pain that
> > anonymous inner classes inflict just dealing with multiple threads of
> > control without creating async spaghetti can be a challenge for complex
> > stuff. That is really the only reason for the futures in the api, they
> are
> > strictly less powerful than the callbacks, but at least using them you
> can
> > just call .get() and pretend it is blocking.
> >
> > -Jay
> >
> > On Wed, Feb 4, 2015 at 7:19 AM, Joe Stein <jo...@stealth.ly> wrote:
> >
> > > Now that 0.8.2.0 is in the wild I look forward to working with more and
> > > seeing what folks start to-do with this function
> > >
> > >
> >
> https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord
> > > ,
> > > org.apache.kafka.clients.producer.Callback) and keeping it fully non
> > > blocking.
> > >
> > > One sprint I know of coming up is going to have the new producer as a
> > > component in their reactive calls and handling bookkeeping and retries
> > > through that type of call back approach. Should work well (haven't
> tried
> > > but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc,
> etc
> > in
> > > functional languages and frameworks.
> > >
> > > I think as JDK 8 starts to get out in the wild too more (may after jdk7
> > > eol) the use of .get will be reduced (imho) and folks will be thinking
> > more
> > > about non-blocking vs blocking and not as so much sync vs async but my
> > > crystal ball just back from the shop so well see =8^)
> > >
> > > /*******************************************
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > ********************************************/
> > >
> > > On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >
> > > > Hey guys,
> > > >
> > > > I guess the question is whether it really matters how many underlying
> > > > network requests occur? It is very hard for an application to depend
> on
> > > > this even in the old producer since it depends on the partitions
> > > placement
> > > > (a send to two partitions may go to either one machine or two and so
> it
> > > > will send either one or two requests). So when you send a batch in
> one
> > > call
> > > > you may feel that is "all at once", but that is only actually
> > guaranteed
> > > if
> > > > all messages have the same partition.
> > > >
> > > > The challenge is allowing even this in the presence of bounded
> request
> > > > sizes which we have in the new producer. The user sends a list of
> > objects
> > > > and the serialized size that will result is not very apparent to
> them.
> > If
> > > > you break it up into multiple requests then that is kind of further
> > > ruining
> > > > the illusion of a single send. If you don't then you have to just
> error
> > > out
> > > > which is equally annoying to have to handle.
> > > >
> > > > But I'm not sure if from your description you are saying you actually
> > > care
> > > > how many physical requests are issued. I think it is more like it is
> > just
> > > > syntactically annoying to send a batch of data now because it needs a
> > for
> > > > loop.
> > > >
> > > > Currently to do this you would do:
> > > >
> > > > List responses = new ArrayList();
> > > > for(input: recordBatch)
> > > >     responses.add(producer.send(input));
> > > > for(response: responses)
> > > >     response.get
> > > >
> > > > If you don't depend on the offset/error info we could add a flush
> call
> > so
> > > > you could instead do
> > > > for(input: recordBatch)
> > > >     producer.send(input);
> > > > producer.flush();
> > > >
> > > > But if you do want the error/offset then you are going to be back to
> > the
> > > > original case.
> > > >
> > > > Thoughts?
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira <gs...@cloudera.com>
> > > > wrote:
> > > >
> > > > > I've been thinking about that too, since both Flume and Sqoop rely
> on
> > > > > send(List) API of the old API.
> > > > >
> > > > > I'd like to see this API come back, but I'm debating how we'd
> handle
> > > > > errors. IIRC, the old API would fail an entire batch on a single
> > > > > error, which can lead to duplicates. Having N callbacks lets me
> retry
> > > > > / save / whatever just the messages that had issues.
> > > > >
> > > > > If messages had identifiers from the producer side, we could have
> the
> > > > > API call the callback with a list of message-ids and their status.
> > But
> > > > > they don't :)
> > > > >
> > > > > Any thoughts on how you'd like it to work?
> > > > >
> > > > > Gwen
> > > > >
> > > > >
> > > > > On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota <
> > > pradeepg26@gmail.com>
> > > > > wrote:
> > > > > > This is a great question Otis. Like Gwen said, you can accomplish
> > > Sync
> > > > > mode
> > > > > > by setting the batch size to 1. But this does highlight a
> > shortcoming
> > > > of
> > > > > > the new producer API.
> > > > > >
> > > > > > I really like the design of the new API and it has really great
> > > > > properties
> > > > > > and I'm enjoying working with it. However, once API that I think
> > > we're
> > > > > > lacking is a "batch" API. Currently, I have to iterate over a
> batch
> > > and
> > > > > > call .send() on each record, which returns n callbacks instead
> of 1
> > > > > > callback for the whole batch. This significantly complicates
> > recovery
> > > > > logic
> > > > > > where we need to commit a batch as opposed 1 record at a time.
> > > > > >
> > > > > > Do you guys have any plans to add better semantics around
> batches?
> > > > > >
> > > > > > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <
> > gshapira@cloudera.com>
> > > > > wrote:
> > > > > >
> > > > > >> If I understood the code and Jay correctly - if you wait for the
> > > > > >> future it will be a similar delay to that of the old sync
> > producer.
> > > > > >>
> > > > > >> Put another way, if you test it out and see longer delays than
> the
> > > > > >> sync producer had, we need to find out why and fix it.
> > > > > >>
> > > > > >> Gwen
> > > > > >>
> > > > > >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> > > > > >> <ot...@gmail.com> wrote:
> > > > > >> > Hi,
> > > > > >> >
> > > > > >> > Nope, unfortunately it can't do that.  X is a remote app,
> > doesn't
> > > > > listen
> > > > > >> to
> > > > > >> > anything external, calls Y via HTTPS.  So X has to decide what
> > to
> > > do
> > > > > with
> > > > > >> > its data based on Y's synchronous response.  It has to block
> > > until Y
> > > > > >> > responds.  And it wouldn't be pretty, I think, because nobody
> > > wants
> > > > to
> > > > > >> run
> > > > > >> > apps that talk to remove servers and hang on to connections
> more
> > > > than
> > > > > >> they
> > > > > >> > have to.  But perhaps that is the only way?  Or maybe the
> answer
> > > to
> > > > > "I'm
> > > > > >> > guessing the delay would be more or less the same as if the
> > > Producer
> > > > > was
> > > > > >> > using SYNC mode?" is YES, in which case the connection from X
> > to Y
> > > > > would
> > > > > >> be
> > > > > >> > open for just as long as with a SYNC producer running in Y?
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> > Otis
> > > > > >> > --
> > > > > >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > > > Management
> > > > > >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > > >> >
> > > > > >> >
> > > > > >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <
> > > gshapira@cloudera.com
> > > > >
> > > > > >> wrote:
> > > > > >> >
> > > > > >> >> Can Y have a callback that will handle the notification to X?
> > > > > >> >> In this case, perhaps Y can be async and X can buffer the
> data
> > > > until
> > > > > >> >> the callback triggers and says "all good" (or resend if the
> > > > callback
> > > > > >> >> indicates an error)
> > > > > >> >>
> > > > > >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> > > > > >> >> <ot...@gmail.com> wrote:
> > > > > >> >> > Hi,
> > > > > >> >> >
> > > > > >> >> > Thanks for the info.  Here's the use case.  We have
> something
> > > up
> > > > > >> stream
> > > > > >> >> > sending data, say a log shipper called X.  It sends it to
> > some
> > > > > remote
> > > > > >> >> > component Y.  Y is the Kafka Producer and it puts data into
> > > > Kafka.
> > > > > >> But Y
> > > > > >> >> > needs to send a reply to X and tell it whether it
> > successfully
> > > > put
> > > > > all
> > > > > >> >> its
> > > > > >> >> > data into Kafka.  If it did not, Y wants to tell X to
> buffer
> > > data
> > > > > >> locally
> > > > > >> >> > and resend it later.
> > > > > >> >> >
> > > > > >> >> > If producer is ONLY async, Y can't easily do that.  Or
> maybe
> > Y
> > > > > would
> > > > > >> just
> > > > > >> >> > need to wait for the Future to come back and only then send
> > the
> > > > > >> response
> > > > > >> >> > back to X?  If so, I'm guessing the delay would be more or
> > less
> > > > the
> > > > > >> same
> > > > > >> >> as
> > > > > >> >> > if the Producer was using SYNC mode?
> > > > > >> >> >
> > > > > >> >> > Thanks,
> > > > > >> >> > Otis
> > > > > >> >> > --
> > > > > >> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > > > > Management
> > > > > >> >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <
> > jay.kreps@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >> >> >
> > > > > >> >> >> Yeah as Gwen says there is no sync/async mode anymore.
> There
> > > is
> > > > a
> > > > > new
> > > > > >> >> >> configuration which does a lot of what async did in terms
> of
> > > > > allowing
> > > > > >> >> >> batching:
> > > > > >> >> >>
> > > > > >> >> >> batch.size - This is the target amount of data per
> partition
> > > the
> > > > > >> server
> > > > > >> >> >> will attempt to batch together.
> > > > > >> >> >> linger.ms - This is the time the producer will wait for
> > more
> > > > data
> > > > > >> to be
> > > > > >> >> >> sent to better batch up writes. The default is 0 (send
> > > > > immediately).
> > > > > >> So
> > > > > >> >> if
> > > > > >> >> >> you set this to 50 ms the client will send immediately if
> it
> > > has
> > > > > >> already
> > > > > >> >> >> filled up its batch, otherwise it will wait to accumulate
> > the
> > > > > number
> > > > > >> of
> > > > > >> >> >> bytes given by batch.size.
> > > > > >> >> >>
> > > > > >> >> >> To send asynchronously you do
> > > > > >> >> >>    producer.send(record)
> > > > > >> >> >> whereas to block on a response you do
> > > > > >> >> >>    producer.send(record).get();
> > > > > >> >> >> which will wait for acknowledgement from the server.
> > > > > >> >> >>
> > > > > >> >> >> One advantage of this model is that the client will do
> it's
> > > best
> > > > > to
> > > > > >> >> batch
> > > > > >> >> >> under the covers even if linger.ms=0. It will do this by
> > > > batching
> > > > > >> any
> > > > > >> >> data
> > > > > >> >> >> that arrives while another send is in progress into a
> single
> > > > > >> >> >> request--giving a kind of "group commit" effect.
> > > > > >> >> >>
> > > > > >> >> >> The hope is that this will be both simpler to understand
> (a
> > > > single
> > > > > >> api
> > > > > >> >> that
> > > > > >> >> >> always works the same) and more powerful (you always get a
> > > > > response
> > > > > >> with
> > > > > >> >> >> error and offset information whether or not you choose to
> > use
> > > > it).
> > > > > >> >> >>
> > > > > >> >> >> -Jay
> > > > > >> >> >>
> > > > > >> >> >>
> > > > > >> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <
> > > > > gshapira@cloudera.com
> > > > > >> >
> > > > > >> >> >> wrote:
> > > > > >> >> >>
> > > > > >> >> >> > If you want to emulate the old sync producer behavior,
> you
> > > > need
> > > > > to
> > > > > >> set
> > > > > >> >> >> > the batch size to 1  (in producer config) and wait on
> the
> > > > future
> > > > > >> you
> > > > > >> >> >> > get from Send (i.e. future.get)
> > > > > >> >> >> >
> > > > > >> >> >> > I can't think of good reasons to do so, though.
> > > > > >> >> >> >
> > > > > >> >> >> > Gwen
> > > > > >> >> >> >
> > > > > >> >> >> >
> > > > > >> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> > > > > >> >> >> > <ot...@gmail.com> wrote:
> > > > > >> >> >> > > Hi,
> > > > > >> >> >> > >
> > > > > >> >> >> > > Is the plan for New Producer to have ONLY async mode?
> > I'm
> > > > > asking
> > > > > >> >> >> because
> > > > > >> >> >> > > of this info from the Wiki:
> > > > > >> >> >> > >
> > > > > >> >> >> > >
> > > > > >> >> >> > >    - The producer will always attempt to batch data
> and
> > > will
> > > > > >> always
> > > > > >> >> >> > >    immediately return a SendResponse which acts as a
> > > Future
> > > > to
> > > > > >> allow
> > > > > >> >> >> the
> > > > > >> >> >> > >    client to await the completion of the request.
> > > > > >> >> >> > >
> > > > > >> >> >> > >
> > > > > >> >> >> > > The word "always" makes me think there will be no sync
> > > mode.
> > > > > >> >> >> > >
> > > > > >> >> >> > > Thanks,
> > > > > >> >> >> > > Otis
> > > > > >> >> >> > > --
> > > > > >> >> >> > > Monitoring * Alerting * Anomaly Detection *
> Centralized
> > > Log
> > > > > >> >> Management
> > > > > >> >> >> > > Solr & Elasticsearch Support * http://sematext.com/
> > > > > >> >> >> >
> > > > > >> >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Re: New Producer - ONLY sync mode?

Posted by Steve Morin <st...@stevemorin.com>.
Looking at this thread I would ideally want something at least the right
recipe to mimic sync behavior like Otis is talking about.

In the second case, would like to be able to individually know if messages
have failed even regardless if they are in separate batches, sort of like
what Kinesis does as Pradeep mentioned.
-Steve

On Wed, Feb 4, 2015 at 11:19 AM, Jay Kreps <ja...@gmail.com> wrote:

> Yeah totally. Using a callback is, of course, the Right Thing for this kind
> of stuff. But I have found that kind of asynchronous thinking can be hard
> for people. Even if you get out of the pre-java 8 syntactic pain that
> anonymous inner classes inflict just dealing with multiple threads of
> control without creating async spaghetti can be a challenge for complex
> stuff. That is really the only reason for the futures in the api, they are
> strictly less powerful than the callbacks, but at least using them you can
> just call .get() and pretend it is blocking.
>
> -Jay
>
> On Wed, Feb 4, 2015 at 7:19 AM, Joe Stein <jo...@stealth.ly> wrote:
>
> > Now that 0.8.2.0 is in the wild I look forward to working with more and
> > seeing what folks start to-do with this function
> >
> >
> https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord
> > ,
> > org.apache.kafka.clients.producer.Callback) and keeping it fully non
> > blocking.
> >
> > One sprint I know of coming up is going to have the new producer as a
> > component in their reactive calls and handling bookkeeping and retries
> > through that type of call back approach. Should work well (haven't tried
> > but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc, etc
> in
> > functional languages and frameworks.
> >
> > I think as JDK 8 starts to get out in the wild too more (may after jdk7
> > eol) the use of .get will be reduced (imho) and folks will be thinking
> more
> > about non-blocking vs blocking and not as so much sync vs async but my
> > crystal ball just back from the shop so well see =8^)
> >
> > /*******************************************
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > ********************************************/
> >
> > On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Hey guys,
> > >
> > > I guess the question is whether it really matters how many underlying
> > > network requests occur? It is very hard for an application to depend on
> > > this even in the old producer since it depends on the partitions
> > placement
> > > (a send to two partitions may go to either one machine or two and so it
> > > will send either one or two requests). So when you send a batch in one
> > call
> > > you may feel that is "all at once", but that is only actually
> guaranteed
> > if
> > > all messages have the same partition.
> > >
> > > The challenge is allowing even this in the presence of bounded request
> > > sizes which we have in the new producer. The user sends a list of
> objects
> > > and the serialized size that will result is not very apparent to them.
> If
> > > you break it up into multiple requests then that is kind of further
> > ruining
> > > the illusion of a single send. If you don't then you have to just error
> > out
> > > which is equally annoying to have to handle.
> > >
> > > But I'm not sure if from your description you are saying you actually
> > care
> > > how many physical requests are issued. I think it is more like it is
> just
> > > syntactically annoying to send a batch of data now because it needs a
> for
> > > loop.
> > >
> > > Currently to do this you would do:
> > >
> > > List responses = new ArrayList();
> > > for(input: recordBatch)
> > >     responses.add(producer.send(input));
> > > for(response: responses)
> > >     response.get
> > >
> > > If you don't depend on the offset/error info we could add a flush call
> so
> > > you could instead do
> > > for(input: recordBatch)
> > >     producer.send(input);
> > > producer.flush();
> > >
> > > But if you do want the error/offset then you are going to be back to
> the
> > > original case.
> > >
> > > Thoughts?
> > >
> > > -Jay
> > >
> > > On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira <gs...@cloudera.com>
> > > wrote:
> > >
> > > > I've been thinking about that too, since both Flume and Sqoop rely on
> > > > send(List) API of the old API.
> > > >
> > > > I'd like to see this API come back, but I'm debating how we'd handle
> > > > errors. IIRC, the old API would fail an entire batch on a single
> > > > error, which can lead to duplicates. Having N callbacks lets me retry
> > > > / save / whatever just the messages that had issues.
> > > >
> > > > If messages had identifiers from the producer side, we could have the
> > > > API call the callback with a list of message-ids and their status.
> But
> > > > they don't :)
> > > >
> > > > Any thoughts on how you'd like it to work?
> > > >
> > > > Gwen
> > > >
> > > >
> > > > On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota <
> > pradeepg26@gmail.com>
> > > > wrote:
> > > > > This is a great question Otis. Like Gwen said, you can accomplish
> > Sync
> > > > mode
> > > > > by setting the batch size to 1. But this does highlight a
> shortcoming
> > > of
> > > > > the new producer API.
> > > > >
> > > > > I really like the design of the new API and it has really great
> > > > properties
> > > > > and I'm enjoying working with it. However, once API that I think
> > we're
> > > > > lacking is a "batch" API. Currently, I have to iterate over a batch
> > and
> > > > > call .send() on each record, which returns n callbacks instead of 1
> > > > > callback for the whole batch. This significantly complicates
> recovery
> > > > logic
> > > > > where we need to commit a batch as opposed 1 record at a time.
> > > > >
> > > > > Do you guys have any plans to add better semantics around batches?
> > > > >
> > > > > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <
> gshapira@cloudera.com>
> > > > wrote:
> > > > >
> > > > >> If I understood the code and Jay correctly - if you wait for the
> > > > >> future it will be a similar delay to that of the old sync
> producer.
> > > > >>
> > > > >> Put another way, if you test it out and see longer delays than the
> > > > >> sync producer had, we need to find out why and fix it.
> > > > >>
> > > > >> Gwen
> > > > >>
> > > > >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> > > > >> <ot...@gmail.com> wrote:
> > > > >> > Hi,
> > > > >> >
> > > > >> > Nope, unfortunately it can't do that.  X is a remote app,
> doesn't
> > > > listen
> > > > >> to
> > > > >> > anything external, calls Y via HTTPS.  So X has to decide what
> to
> > do
> > > > with
> > > > >> > its data based on Y's synchronous response.  It has to block
> > until Y
> > > > >> > responds.  And it wouldn't be pretty, I think, because nobody
> > wants
> > > to
> > > > >> run
> > > > >> > apps that talk to remove servers and hang on to connections more
> > > than
> > > > >> they
> > > > >> > have to.  But perhaps that is the only way?  Or maybe the answer
> > to
> > > > "I'm
> > > > >> > guessing the delay would be more or less the same as if the
> > Producer
> > > > was
> > > > >> > using SYNC mode?" is YES, in which case the connection from X
> to Y
> > > > would
> > > > >> be
> > > > >> > open for just as long as with a SYNC producer running in Y?
> > > > >> >
> > > > >> > Thanks,
> > > > >> > Otis
> > > > >> > --
> > > > >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > > Management
> > > > >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > >> >
> > > > >> >
> > > > >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <
> > gshapira@cloudera.com
> > > >
> > > > >> wrote:
> > > > >> >
> > > > >> >> Can Y have a callback that will handle the notification to X?
> > > > >> >> In this case, perhaps Y can be async and X can buffer the data
> > > until
> > > > >> >> the callback triggers and says "all good" (or resend if the
> > > callback
> > > > >> >> indicates an error)
> > > > >> >>
> > > > >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> > > > >> >> <ot...@gmail.com> wrote:
> > > > >> >> > Hi,
> > > > >> >> >
> > > > >> >> > Thanks for the info.  Here's the use case.  We have something
> > up
> > > > >> stream
> > > > >> >> > sending data, say a log shipper called X.  It sends it to
> some
> > > > remote
> > > > >> >> > component Y.  Y is the Kafka Producer and it puts data into
> > > Kafka.
> > > > >> But Y
> > > > >> >> > needs to send a reply to X and tell it whether it
> successfully
> > > put
> > > > all
> > > > >> >> its
> > > > >> >> > data into Kafka.  If it did not, Y wants to tell X to buffer
> > data
> > > > >> locally
> > > > >> >> > and resend it later.
> > > > >> >> >
> > > > >> >> > If producer is ONLY async, Y can't easily do that.  Or maybe
> Y
> > > > would
> > > > >> just
> > > > >> >> > need to wait for the Future to come back and only then send
> the
> > > > >> response
> > > > >> >> > back to X?  If so, I'm guessing the delay would be more or
> less
> > > the
> > > > >> same
> > > > >> >> as
> > > > >> >> > if the Producer was using SYNC mode?
> > > > >> >> >
> > > > >> >> > Thanks,
> > > > >> >> > Otis
> > > > >> >> > --
> > > > >> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > > > Management
> > > > >> >> > Solr & Elasticsearch Support * http://sematext.com/
> > > > >> >> >
> > > > >> >> >
> > > > >> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <
> jay.kreps@gmail.com
> > >
> > > > >> wrote:
> > > > >> >> >
> > > > >> >> >> Yeah as Gwen says there is no sync/async mode anymore. There
> > is
> > > a
> > > > new
> > > > >> >> >> configuration which does a lot of what async did in terms of
> > > > allowing
> > > > >> >> >> batching:
> > > > >> >> >>
> > > > >> >> >> batch.size - This is the target amount of data per partition
> > the
> > > > >> server
> > > > >> >> >> will attempt to batch together.
> > > > >> >> >> linger.ms - This is the time the producer will wait for
> more
> > > data
> > > > >> to be
> > > > >> >> >> sent to better batch up writes. The default is 0 (send
> > > > immediately).
> > > > >> So
> > > > >> >> if
> > > > >> >> >> you set this to 50 ms the client will send immediately if it
> > has
> > > > >> already
> > > > >> >> >> filled up its batch, otherwise it will wait to accumulate
> the
> > > > number
> > > > >> of
> > > > >> >> >> bytes given by batch.size.
> > > > >> >> >>
> > > > >> >> >> To send asynchronously you do
> > > > >> >> >>    producer.send(record)
> > > > >> >> >> whereas to block on a response you do
> > > > >> >> >>    producer.send(record).get();
> > > > >> >> >> which will wait for acknowledgement from the server.
> > > > >> >> >>
> > > > >> >> >> One advantage of this model is that the client will do it's
> > best
> > > > to
> > > > >> >> batch
> > > > >> >> >> under the covers even if linger.ms=0. It will do this by
> > > batching
> > > > >> any
> > > > >> >> data
> > > > >> >> >> that arrives while another send is in progress into a single
> > > > >> >> >> request--giving a kind of "group commit" effect.
> > > > >> >> >>
> > > > >> >> >> The hope is that this will be both simpler to understand (a
> > > single
> > > > >> api
> > > > >> >> that
> > > > >> >> >> always works the same) and more powerful (you always get a
> > > > response
> > > > >> with
> > > > >> >> >> error and offset information whether or not you choose to
> use
> > > it).
> > > > >> >> >>
> > > > >> >> >> -Jay
> > > > >> >> >>
> > > > >> >> >>
> > > > >> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <
> > > > gshapira@cloudera.com
> > > > >> >
> > > > >> >> >> wrote:
> > > > >> >> >>
> > > > >> >> >> > If you want to emulate the old sync producer behavior, you
> > > need
> > > > to
> > > > >> set
> > > > >> >> >> > the batch size to 1  (in producer config) and wait on the
> > > future
> > > > >> you
> > > > >> >> >> > get from Send (i.e. future.get)
> > > > >> >> >> >
> > > > >> >> >> > I can't think of good reasons to do so, though.
> > > > >> >> >> >
> > > > >> >> >> > Gwen
> > > > >> >> >> >
> > > > >> >> >> >
> > > > >> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> > > > >> >> >> > <ot...@gmail.com> wrote:
> > > > >> >> >> > > Hi,
> > > > >> >> >> > >
> > > > >> >> >> > > Is the plan for New Producer to have ONLY async mode?
> I'm
> > > > asking
> > > > >> >> >> because
> > > > >> >> >> > > of this info from the Wiki:
> > > > >> >> >> > >
> > > > >> >> >> > >
> > > > >> >> >> > >    - The producer will always attempt to batch data and
> > will
> > > > >> always
> > > > >> >> >> > >    immediately return a SendResponse which acts as a
> > Future
> > > to
> > > > >> allow
> > > > >> >> >> the
> > > > >> >> >> > >    client to await the completion of the request.
> > > > >> >> >> > >
> > > > >> >> >> > >
> > > > >> >> >> > > The word "always" makes me think there will be no sync
> > mode.
> > > > >> >> >> > >
> > > > >> >> >> > > Thanks,
> > > > >> >> >> > > Otis
> > > > >> >> >> > > --
> > > > >> >> >> > > Monitoring * Alerting * Anomaly Detection * Centralized
> > Log
> > > > >> >> Management
> > > > >> >> >> > > Solr & Elasticsearch Support * http://sematext.com/
> > > > >> >> >> >
> > > > >> >> >>
> > > > >> >>
> > > > >>
> > > >
> > >
> >
>

Re: New Producer - ONLY sync mode?

Posted by Jay Kreps <ja...@gmail.com>.
Yeah totally. Using a callback is, of course, the Right Thing for this kind
of stuff. But I have found that kind of asynchronous thinking can be hard
for people. Even if you get out of the pre-java 8 syntactic pain that
anonymous inner classes inflict just dealing with multiple threads of
control without creating async spaghetti can be a challenge for complex
stuff. That is really the only reason for the futures in the api, they are
strictly less powerful than the callbacks, but at least using them you can
just call .get() and pretend it is blocking.

-Jay

On Wed, Feb 4, 2015 at 7:19 AM, Joe Stein <jo...@stealth.ly> wrote:

> Now that 0.8.2.0 is in the wild I look forward to working with more and
> seeing what folks start to-do with this function
>
> https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord
> ,
> org.apache.kafka.clients.producer.Callback) and keeping it fully non
> blocking.
>
> One sprint I know of coming up is going to have the new producer as a
> component in their reactive calls and handling bookkeeping and retries
> through that type of call back approach. Should work well (haven't tried
> but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc, etc in
> functional languages and frameworks.
>
> I think as JDK 8 starts to get out in the wild too more (may after jdk7
> eol) the use of .get will be reduced (imho) and folks will be thinking more
> about non-blocking vs blocking and not as so much sync vs async but my
> crystal ball just back from the shop so well see =8^)
>
> /*******************************************
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********************************************/
>
> On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Hey guys,
> >
> > I guess the question is whether it really matters how many underlying
> > network requests occur? It is very hard for an application to depend on
> > this even in the old producer since it depends on the partitions
> placement
> > (a send to two partitions may go to either one machine or two and so it
> > will send either one or two requests). So when you send a batch in one
> call
> > you may feel that is "all at once", but that is only actually guaranteed
> if
> > all messages have the same partition.
> >
> > The challenge is allowing even this in the presence of bounded request
> > sizes which we have in the new producer. The user sends a list of objects
> > and the serialized size that will result is not very apparent to them. If
> > you break it up into multiple requests then that is kind of further
> ruining
> > the illusion of a single send. If you don't then you have to just error
> out
> > which is equally annoying to have to handle.
> >
> > But I'm not sure if from your description you are saying you actually
> care
> > how many physical requests are issued. I think it is more like it is just
> > syntactically annoying to send a batch of data now because it needs a for
> > loop.
> >
> > Currently to do this you would do:
> >
> > List responses = new ArrayList();
> > for(input: recordBatch)
> >     responses.add(producer.send(input));
> > for(response: responses)
> >     response.get
> >
> > If you don't depend on the offset/error info we could add a flush call so
> > you could instead do
> > for(input: recordBatch)
> >     producer.send(input);
> > producer.flush();
> >
> > But if you do want the error/offset then you are going to be back to the
> > original case.
> >
> > Thoughts?
> >
> > -Jay
> >
> > On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira <gs...@cloudera.com>
> > wrote:
> >
> > > I've been thinking about that too, since both Flume and Sqoop rely on
> > > send(List) API of the old API.
> > >
> > > I'd like to see this API come back, but I'm debating how we'd handle
> > > errors. IIRC, the old API would fail an entire batch on a single
> > > error, which can lead to duplicates. Having N callbacks lets me retry
> > > / save / whatever just the messages that had issues.
> > >
> > > If messages had identifiers from the producer side, we could have the
> > > API call the callback with a list of message-ids and their status. But
> > > they don't :)
> > >
> > > Any thoughts on how you'd like it to work?
> > >
> > > Gwen
> > >
> > >
> > > On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota <
> pradeepg26@gmail.com>
> > > wrote:
> > > > This is a great question Otis. Like Gwen said, you can accomplish
> Sync
> > > mode
> > > > by setting the batch size to 1. But this does highlight a shortcoming
> > of
> > > > the new producer API.
> > > >
> > > > I really like the design of the new API and it has really great
> > > properties
> > > > and I'm enjoying working with it. However, once API that I think
> we're
> > > > lacking is a "batch" API. Currently, I have to iterate over a batch
> and
> > > > call .send() on each record, which returns n callbacks instead of 1
> > > > callback for the whole batch. This significantly complicates recovery
> > > logic
> > > > where we need to commit a batch as opposed 1 record at a time.
> > > >
> > > > Do you guys have any plans to add better semantics around batches?
> > > >
> > > > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <gs...@cloudera.com>
> > > wrote:
> > > >
> > > >> If I understood the code and Jay correctly - if you wait for the
> > > >> future it will be a similar delay to that of the old sync producer.
> > > >>
> > > >> Put another way, if you test it out and see longer delays than the
> > > >> sync producer had, we need to find out why and fix it.
> > > >>
> > > >> Gwen
> > > >>
> > > >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> > > >> <ot...@gmail.com> wrote:
> > > >> > Hi,
> > > >> >
> > > >> > Nope, unfortunately it can't do that.  X is a remote app, doesn't
> > > listen
> > > >> to
> > > >> > anything external, calls Y via HTTPS.  So X has to decide what to
> do
> > > with
> > > >> > its data based on Y's synchronous response.  It has to block
> until Y
> > > >> > responds.  And it wouldn't be pretty, I think, because nobody
> wants
> > to
> > > >> run
> > > >> > apps that talk to remove servers and hang on to connections more
> > than
> > > >> they
> > > >> > have to.  But perhaps that is the only way?  Or maybe the answer
> to
> > > "I'm
> > > >> > guessing the delay would be more or less the same as if the
> Producer
> > > was
> > > >> > using SYNC mode?" is YES, in which case the connection from X to Y
> > > would
> > > >> be
> > > >> > open for just as long as with a SYNC producer running in Y?
> > > >> >
> > > >> > Thanks,
> > > >> > Otis
> > > >> > --
> > > >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > Management
> > > >> > Solr & Elasticsearch Support * http://sematext.com/
> > > >> >
> > > >> >
> > > >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <
> gshapira@cloudera.com
> > >
> > > >> wrote:
> > > >> >
> > > >> >> Can Y have a callback that will handle the notification to X?
> > > >> >> In this case, perhaps Y can be async and X can buffer the data
> > until
> > > >> >> the callback triggers and says "all good" (or resend if the
> > callback
> > > >> >> indicates an error)
> > > >> >>
> > > >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> > > >> >> <ot...@gmail.com> wrote:
> > > >> >> > Hi,
> > > >> >> >
> > > >> >> > Thanks for the info.  Here's the use case.  We have something
> up
> > > >> stream
> > > >> >> > sending data, say a log shipper called X.  It sends it to some
> > > remote
> > > >> >> > component Y.  Y is the Kafka Producer and it puts data into
> > Kafka.
> > > >> But Y
> > > >> >> > needs to send a reply to X and tell it whether it successfully
> > put
> > > all
> > > >> >> its
> > > >> >> > data into Kafka.  If it did not, Y wants to tell X to buffer
> data
> > > >> locally
> > > >> >> > and resend it later.
> > > >> >> >
> > > >> >> > If producer is ONLY async, Y can't easily do that.  Or maybe Y
> > > would
> > > >> just
> > > >> >> > need to wait for the Future to come back and only then send the
> > > >> response
> > > >> >> > back to X?  If so, I'm guessing the delay would be more or less
> > the
> > > >> same
> > > >> >> as
> > > >> >> > if the Producer was using SYNC mode?
> > > >> >> >
> > > >> >> > Thanks,
> > > >> >> > Otis
> > > >> >> > --
> > > >> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > > Management
> > > >> >> > Solr & Elasticsearch Support * http://sematext.com/
> > > >> >> >
> > > >> >> >
> > > >> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <jay.kreps@gmail.com
> >
> > > >> wrote:
> > > >> >> >
> > > >> >> >> Yeah as Gwen says there is no sync/async mode anymore. There
> is
> > a
> > > new
> > > >> >> >> configuration which does a lot of what async did in terms of
> > > allowing
> > > >> >> >> batching:
> > > >> >> >>
> > > >> >> >> batch.size - This is the target amount of data per partition
> the
> > > >> server
> > > >> >> >> will attempt to batch together.
> > > >> >> >> linger.ms - This is the time the producer will wait for more
> > data
> > > >> to be
> > > >> >> >> sent to better batch up writes. The default is 0 (send
> > > immediately).
> > > >> So
> > > >> >> if
> > > >> >> >> you set this to 50 ms the client will send immediately if it
> has
> > > >> already
> > > >> >> >> filled up its batch, otherwise it will wait to accumulate the
> > > number
> > > >> of
> > > >> >> >> bytes given by batch.size.
> > > >> >> >>
> > > >> >> >> To send asynchronously you do
> > > >> >> >>    producer.send(record)
> > > >> >> >> whereas to block on a response you do
> > > >> >> >>    producer.send(record).get();
> > > >> >> >> which will wait for acknowledgement from the server.
> > > >> >> >>
> > > >> >> >> One advantage of this model is that the client will do it's
> best
> > > to
> > > >> >> batch
> > > >> >> >> under the covers even if linger.ms=0. It will do this by
> > batching
> > > >> any
> > > >> >> data
> > > >> >> >> that arrives while another send is in progress into a single
> > > >> >> >> request--giving a kind of "group commit" effect.
> > > >> >> >>
> > > >> >> >> The hope is that this will be both simpler to understand (a
> > single
> > > >> api
> > > >> >> that
> > > >> >> >> always works the same) and more powerful (you always get a
> > > response
> > > >> with
> > > >> >> >> error and offset information whether or not you choose to use
> > it).
> > > >> >> >>
> > > >> >> >> -Jay
> > > >> >> >>
> > > >> >> >>
> > > >> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <
> > > gshapira@cloudera.com
> > > >> >
> > > >> >> >> wrote:
> > > >> >> >>
> > > >> >> >> > If you want to emulate the old sync producer behavior, you
> > need
> > > to
> > > >> set
> > > >> >> >> > the batch size to 1  (in producer config) and wait on the
> > future
> > > >> you
> > > >> >> >> > get from Send (i.e. future.get)
> > > >> >> >> >
> > > >> >> >> > I can't think of good reasons to do so, though.
> > > >> >> >> >
> > > >> >> >> > Gwen
> > > >> >> >> >
> > > >> >> >> >
> > > >> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> > > >> >> >> > <ot...@gmail.com> wrote:
> > > >> >> >> > > Hi,
> > > >> >> >> > >
> > > >> >> >> > > Is the plan for New Producer to have ONLY async mode?  I'm
> > > asking
> > > >> >> >> because
> > > >> >> >> > > of this info from the Wiki:
> > > >> >> >> > >
> > > >> >> >> > >
> > > >> >> >> > >    - The producer will always attempt to batch data and
> will
> > > >> always
> > > >> >> >> > >    immediately return a SendResponse which acts as a
> Future
> > to
> > > >> allow
> > > >> >> >> the
> > > >> >> >> > >    client to await the completion of the request.
> > > >> >> >> > >
> > > >> >> >> > >
> > > >> >> >> > > The word "always" makes me think there will be no sync
> mode.
> > > >> >> >> > >
> > > >> >> >> > > Thanks,
> > > >> >> >> > > Otis
> > > >> >> >> > > --
> > > >> >> >> > > Monitoring * Alerting * Anomaly Detection * Centralized
> Log
> > > >> >> Management
> > > >> >> >> > > Solr & Elasticsearch Support * http://sematext.com/
> > > >> >> >> >
> > > >> >> >>
> > > >> >>
> > > >>
> > >
> >
>

Re: New Producer - ONLY sync mode?

Posted by Joe Stein <jo...@stealth.ly>.
Now that 0.8.2.0 is in the wild I look forward to working with more and
seeing what folks start to-do with this function
https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord,
org.apache.kafka.clients.producer.Callback) and keeping it fully non
blocking.

One sprint I know of coming up is going to have the new producer as a
component in their reactive calls and handling bookkeeping and retries
through that type of call back approach. Should work well (haven't tried
but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc, etc in
functional languages and frameworks.

I think as JDK 8 starts to get out in the wild too more (may after jdk7
eol) the use of .get will be reduced (imho) and folks will be thinking more
about non-blocking vs blocking and not as so much sync vs async but my
crystal ball just back from the shop so well see =8^)

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/

On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps <ja...@gmail.com> wrote:

> Hey guys,
>
> I guess the question is whether it really matters how many underlying
> network requests occur? It is very hard for an application to depend on
> this even in the old producer since it depends on the partitions placement
> (a send to two partitions may go to either one machine or two and so it
> will send either one or two requests). So when you send a batch in one call
> you may feel that is "all at once", but that is only actually guaranteed if
> all messages have the same partition.
>
> The challenge is allowing even this in the presence of bounded request
> sizes which we have in the new producer. The user sends a list of objects
> and the serialized size that will result is not very apparent to them. If
> you break it up into multiple requests then that is kind of further ruining
> the illusion of a single send. If you don't then you have to just error out
> which is equally annoying to have to handle.
>
> But I'm not sure if from your description you are saying you actually care
> how many physical requests are issued. I think it is more like it is just
> syntactically annoying to send a batch of data now because it needs a for
> loop.
>
> Currently to do this you would do:
>
> List responses = new ArrayList();
> for(input: recordBatch)
>     responses.add(producer.send(input));
> for(response: responses)
>     response.get
>
> If you don't depend on the offset/error info we could add a flush call so
> you could instead do
> for(input: recordBatch)
>     producer.send(input);
> producer.flush();
>
> But if you do want the error/offset then you are going to be back to the
> original case.
>
> Thoughts?
>
> -Jay
>
> On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira <gs...@cloudera.com>
> wrote:
>
> > I've been thinking about that too, since both Flume and Sqoop rely on
> > send(List) API of the old API.
> >
> > I'd like to see this API come back, but I'm debating how we'd handle
> > errors. IIRC, the old API would fail an entire batch on a single
> > error, which can lead to duplicates. Having N callbacks lets me retry
> > / save / whatever just the messages that had issues.
> >
> > If messages had identifiers from the producer side, we could have the
> > API call the callback with a list of message-ids and their status. But
> > they don't :)
> >
> > Any thoughts on how you'd like it to work?
> >
> > Gwen
> >
> >
> > On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota <pr...@gmail.com>
> > wrote:
> > > This is a great question Otis. Like Gwen said, you can accomplish Sync
> > mode
> > > by setting the batch size to 1. But this does highlight a shortcoming
> of
> > > the new producer API.
> > >
> > > I really like the design of the new API and it has really great
> > properties
> > > and I'm enjoying working with it. However, once API that I think we're
> > > lacking is a "batch" API. Currently, I have to iterate over a batch and
> > > call .send() on each record, which returns n callbacks instead of 1
> > > callback for the whole batch. This significantly complicates recovery
> > logic
> > > where we need to commit a batch as opposed 1 record at a time.
> > >
> > > Do you guys have any plans to add better semantics around batches?
> > >
> > > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <gs...@cloudera.com>
> > wrote:
> > >
> > >> If I understood the code and Jay correctly - if you wait for the
> > >> future it will be a similar delay to that of the old sync producer.
> > >>
> > >> Put another way, if you test it out and see longer delays than the
> > >> sync producer had, we need to find out why and fix it.
> > >>
> > >> Gwen
> > >>
> > >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> > >> <ot...@gmail.com> wrote:
> > >> > Hi,
> > >> >
> > >> > Nope, unfortunately it can't do that.  X is a remote app, doesn't
> > listen
> > >> to
> > >> > anything external, calls Y via HTTPS.  So X has to decide what to do
> > with
> > >> > its data based on Y's synchronous response.  It has to block until Y
> > >> > responds.  And it wouldn't be pretty, I think, because nobody wants
> to
> > >> run
> > >> > apps that talk to remove servers and hang on to connections more
> than
> > >> they
> > >> > have to.  But perhaps that is the only way?  Or maybe the answer to
> > "I'm
> > >> > guessing the delay would be more or less the same as if the Producer
> > was
> > >> > using SYNC mode?" is YES, in which case the connection from X to Y
> > would
> > >> be
> > >> > open for just as long as with a SYNC producer running in Y?
> > >> >
> > >> > Thanks,
> > >> > Otis
> > >> > --
> > >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> Management
> > >> > Solr & Elasticsearch Support * http://sematext.com/
> > >> >
> > >> >
> > >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <gshapira@cloudera.com
> >
> > >> wrote:
> > >> >
> > >> >> Can Y have a callback that will handle the notification to X?
> > >> >> In this case, perhaps Y can be async and X can buffer the data
> until
> > >> >> the callback triggers and says "all good" (or resend if the
> callback
> > >> >> indicates an error)
> > >> >>
> > >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> > >> >> <ot...@gmail.com> wrote:
> > >> >> > Hi,
> > >> >> >
> > >> >> > Thanks for the info.  Here's the use case.  We have something up
> > >> stream
> > >> >> > sending data, say a log shipper called X.  It sends it to some
> > remote
> > >> >> > component Y.  Y is the Kafka Producer and it puts data into
> Kafka.
> > >> But Y
> > >> >> > needs to send a reply to X and tell it whether it successfully
> put
> > all
> > >> >> its
> > >> >> > data into Kafka.  If it did not, Y wants to tell X to buffer data
> > >> locally
> > >> >> > and resend it later.
> > >> >> >
> > >> >> > If producer is ONLY async, Y can't easily do that.  Or maybe Y
> > would
> > >> just
> > >> >> > need to wait for the Future to come back and only then send the
> > >> response
> > >> >> > back to X?  If so, I'm guessing the delay would be more or less
> the
> > >> same
> > >> >> as
> > >> >> > if the Producer was using SYNC mode?
> > >> >> >
> > >> >> > Thanks,
> > >> >> > Otis
> > >> >> > --
> > >> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > Management
> > >> >> > Solr & Elasticsearch Support * http://sematext.com/
> > >> >> >
> > >> >> >
> > >> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <ja...@gmail.com>
> > >> wrote:
> > >> >> >
> > >> >> >> Yeah as Gwen says there is no sync/async mode anymore. There is
> a
> > new
> > >> >> >> configuration which does a lot of what async did in terms of
> > allowing
> > >> >> >> batching:
> > >> >> >>
> > >> >> >> batch.size - This is the target amount of data per partition the
> > >> server
> > >> >> >> will attempt to batch together.
> > >> >> >> linger.ms - This is the time the producer will wait for more
> data
> > >> to be
> > >> >> >> sent to better batch up writes. The default is 0 (send
> > immediately).
> > >> So
> > >> >> if
> > >> >> >> you set this to 50 ms the client will send immediately if it has
> > >> already
> > >> >> >> filled up its batch, otherwise it will wait to accumulate the
> > number
> > >> of
> > >> >> >> bytes given by batch.size.
> > >> >> >>
> > >> >> >> To send asynchronously you do
> > >> >> >>    producer.send(record)
> > >> >> >> whereas to block on a response you do
> > >> >> >>    producer.send(record).get();
> > >> >> >> which will wait for acknowledgement from the server.
> > >> >> >>
> > >> >> >> One advantage of this model is that the client will do it's best
> > to
> > >> >> batch
> > >> >> >> under the covers even if linger.ms=0. It will do this by
> batching
> > >> any
> > >> >> data
> > >> >> >> that arrives while another send is in progress into a single
> > >> >> >> request--giving a kind of "group commit" effect.
> > >> >> >>
> > >> >> >> The hope is that this will be both simpler to understand (a
> single
> > >> api
> > >> >> that
> > >> >> >> always works the same) and more powerful (you always get a
> > response
> > >> with
> > >> >> >> error and offset information whether or not you choose to use
> it).
> > >> >> >>
> > >> >> >> -Jay
> > >> >> >>
> > >> >> >>
> > >> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <
> > gshapira@cloudera.com
> > >> >
> > >> >> >> wrote:
> > >> >> >>
> > >> >> >> > If you want to emulate the old sync producer behavior, you
> need
> > to
> > >> set
> > >> >> >> > the batch size to 1  (in producer config) and wait on the
> future
> > >> you
> > >> >> >> > get from Send (i.e. future.get)
> > >> >> >> >
> > >> >> >> > I can't think of good reasons to do so, though.
> > >> >> >> >
> > >> >> >> > Gwen
> > >> >> >> >
> > >> >> >> >
> > >> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> > >> >> >> > <ot...@gmail.com> wrote:
> > >> >> >> > > Hi,
> > >> >> >> > >
> > >> >> >> > > Is the plan for New Producer to have ONLY async mode?  I'm
> > asking
> > >> >> >> because
> > >> >> >> > > of this info from the Wiki:
> > >> >> >> > >
> > >> >> >> > >
> > >> >> >> > >    - The producer will always attempt to batch data and will
> > >> always
> > >> >> >> > >    immediately return a SendResponse which acts as a Future
> to
> > >> allow
> > >> >> >> the
> > >> >> >> > >    client to await the completion of the request.
> > >> >> >> > >
> > >> >> >> > >
> > >> >> >> > > The word "always" makes me think there will be no sync mode.
> > >> >> >> > >
> > >> >> >> > > Thanks,
> > >> >> >> > > Otis
> > >> >> >> > > --
> > >> >> >> > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > >> >> Management
> > >> >> >> > > Solr & Elasticsearch Support * http://sematext.com/
> > >> >> >> >
> > >> >> >>
> > >> >>
> > >>
> >
>

Re: New Producer - ONLY sync mode?

Posted by Jay Kreps <ja...@gmail.com>.
Hey guys,

I guess the question is whether it really matters how many underlying
network requests occur? It is very hard for an application to depend on
this even in the old producer since it depends on the partitions placement
(a send to two partitions may go to either one machine or two and so it
will send either one or two requests). So when you send a batch in one call
you may feel that is "all at once", but that is only actually guaranteed if
all messages have the same partition.

The challenge is allowing even this in the presence of bounded request
sizes which we have in the new producer. The user sends a list of objects
and the serialized size that will result is not very apparent to them. If
you break it up into multiple requests then that is kind of further ruining
the illusion of a single send. If you don't then you have to just error out
which is equally annoying to have to handle.

But I'm not sure if from your description you are saying you actually care
how many physical requests are issued. I think it is more like it is just
syntactically annoying to send a batch of data now because it needs a for
loop.

Currently to do this you would do:

List responses = new ArrayList();
for(input: recordBatch)
    responses.add(producer.send(input));
for(response: responses)
    response.get

If you don't depend on the offset/error info we could add a flush call so
you could instead do
for(input: recordBatch)
    producer.send(input);
producer.flush();

But if you do want the error/offset then you are going to be back to the
original case.

Thoughts?

-Jay

On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira <gs...@cloudera.com> wrote:

> I've been thinking about that too, since both Flume and Sqoop rely on
> send(List) API of the old API.
>
> I'd like to see this API come back, but I'm debating how we'd handle
> errors. IIRC, the old API would fail an entire batch on a single
> error, which can lead to duplicates. Having N callbacks lets me retry
> / save / whatever just the messages that had issues.
>
> If messages had identifiers from the producer side, we could have the
> API call the callback with a list of message-ids and their status. But
> they don't :)
>
> Any thoughts on how you'd like it to work?
>
> Gwen
>
>
> On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota <pr...@gmail.com>
> wrote:
> > This is a great question Otis. Like Gwen said, you can accomplish Sync
> mode
> > by setting the batch size to 1. But this does highlight a shortcoming of
> > the new producer API.
> >
> > I really like the design of the new API and it has really great
> properties
> > and I'm enjoying working with it. However, once API that I think we're
> > lacking is a "batch" API. Currently, I have to iterate over a batch and
> > call .send() on each record, which returns n callbacks instead of 1
> > callback for the whole batch. This significantly complicates recovery
> logic
> > where we need to commit a batch as opposed 1 record at a time.
> >
> > Do you guys have any plans to add better semantics around batches?
> >
> > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <gs...@cloudera.com>
> wrote:
> >
> >> If I understood the code and Jay correctly - if you wait for the
> >> future it will be a similar delay to that of the old sync producer.
> >>
> >> Put another way, if you test it out and see longer delays than the
> >> sync producer had, we need to find out why and fix it.
> >>
> >> Gwen
> >>
> >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> >> <ot...@gmail.com> wrote:
> >> > Hi,
> >> >
> >> > Nope, unfortunately it can't do that.  X is a remote app, doesn't
> listen
> >> to
> >> > anything external, calls Y via HTTPS.  So X has to decide what to do
> with
> >> > its data based on Y's synchronous response.  It has to block until Y
> >> > responds.  And it wouldn't be pretty, I think, because nobody wants to
> >> run
> >> > apps that talk to remove servers and hang on to connections more than
> >> they
> >> > have to.  But perhaps that is the only way?  Or maybe the answer to
> "I'm
> >> > guessing the delay would be more or less the same as if the Producer
> was
> >> > using SYNC mode?" is YES, in which case the connection from X to Y
> would
> >> be
> >> > open for just as long as with a SYNC producer running in Y?
> >> >
> >> > Thanks,
> >> > Otis
> >> > --
> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> >> > Solr & Elasticsearch Support * http://sematext.com/
> >> >
> >> >
> >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <gs...@cloudera.com>
> >> wrote:
> >> >
> >> >> Can Y have a callback that will handle the notification to X?
> >> >> In this case, perhaps Y can be async and X can buffer the data until
> >> >> the callback triggers and says "all good" (or resend if the callback
> >> >> indicates an error)
> >> >>
> >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> >> >> <ot...@gmail.com> wrote:
> >> >> > Hi,
> >> >> >
> >> >> > Thanks for the info.  Here's the use case.  We have something up
> >> stream
> >> >> > sending data, say a log shipper called X.  It sends it to some
> remote
> >> >> > component Y.  Y is the Kafka Producer and it puts data into Kafka.
> >> But Y
> >> >> > needs to send a reply to X and tell it whether it successfully put
> all
> >> >> its
> >> >> > data into Kafka.  If it did not, Y wants to tell X to buffer data
> >> locally
> >> >> > and resend it later.
> >> >> >
> >> >> > If producer is ONLY async, Y can't easily do that.  Or maybe Y
> would
> >> just
> >> >> > need to wait for the Future to come back and only then send the
> >> response
> >> >> > back to X?  If so, I'm guessing the delay would be more or less the
> >> same
> >> >> as
> >> >> > if the Producer was using SYNC mode?
> >> >> >
> >> >> > Thanks,
> >> >> > Otis
> >> >> > --
> >> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> Management
> >> >> > Solr & Elasticsearch Support * http://sematext.com/
> >> >> >
> >> >> >
> >> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <ja...@gmail.com>
> >> wrote:
> >> >> >
> >> >> >> Yeah as Gwen says there is no sync/async mode anymore. There is a
> new
> >> >> >> configuration which does a lot of what async did in terms of
> allowing
> >> >> >> batching:
> >> >> >>
> >> >> >> batch.size - This is the target amount of data per partition the
> >> server
> >> >> >> will attempt to batch together.
> >> >> >> linger.ms - This is the time the producer will wait for more data
> >> to be
> >> >> >> sent to better batch up writes. The default is 0 (send
> immediately).
> >> So
> >> >> if
> >> >> >> you set this to 50 ms the client will send immediately if it has
> >> already
> >> >> >> filled up its batch, otherwise it will wait to accumulate the
> number
> >> of
> >> >> >> bytes given by batch.size.
> >> >> >>
> >> >> >> To send asynchronously you do
> >> >> >>    producer.send(record)
> >> >> >> whereas to block on a response you do
> >> >> >>    producer.send(record).get();
> >> >> >> which will wait for acknowledgement from the server.
> >> >> >>
> >> >> >> One advantage of this model is that the client will do it's best
> to
> >> >> batch
> >> >> >> under the covers even if linger.ms=0. It will do this by batching
> >> any
> >> >> data
> >> >> >> that arrives while another send is in progress into a single
> >> >> >> request--giving a kind of "group commit" effect.
> >> >> >>
> >> >> >> The hope is that this will be both simpler to understand (a single
> >> api
> >> >> that
> >> >> >> always works the same) and more powerful (you always get a
> response
> >> with
> >> >> >> error and offset information whether or not you choose to use it).
> >> >> >>
> >> >> >> -Jay
> >> >> >>
> >> >> >>
> >> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <
> gshapira@cloudera.com
> >> >
> >> >> >> wrote:
> >> >> >>
> >> >> >> > If you want to emulate the old sync producer behavior, you need
> to
> >> set
> >> >> >> > the batch size to 1  (in producer config) and wait on the future
> >> you
> >> >> >> > get from Send (i.e. future.get)
> >> >> >> >
> >> >> >> > I can't think of good reasons to do so, though.
> >> >> >> >
> >> >> >> > Gwen
> >> >> >> >
> >> >> >> >
> >> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> >> >> >> > <ot...@gmail.com> wrote:
> >> >> >> > > Hi,
> >> >> >> > >
> >> >> >> > > Is the plan for New Producer to have ONLY async mode?  I'm
> asking
> >> >> >> because
> >> >> >> > > of this info from the Wiki:
> >> >> >> > >
> >> >> >> > >
> >> >> >> > >    - The producer will always attempt to batch data and will
> >> always
> >> >> >> > >    immediately return a SendResponse which acts as a Future to
> >> allow
> >> >> >> the
> >> >> >> > >    client to await the completion of the request.
> >> >> >> > >
> >> >> >> > >
> >> >> >> > > The word "always" makes me think there will be no sync mode.
> >> >> >> > >
> >> >> >> > > Thanks,
> >> >> >> > > Otis
> >> >> >> > > --
> >> >> >> > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> >> >> Management
> >> >> >> > > Solr & Elasticsearch Support * http://sematext.com/
> >> >> >> >
> >> >> >>
> >> >>
> >>
>

Re: New Producer - ONLY sync mode?

Posted by Pradeep Gollakota <pr...@gmail.com>.
I looked at the newly added batch API to Kinesis for inspiration. The
response on the batch put is a list of message-ids and their status (offset
if success else a failure code).

Ideally, I think the server should fail the entire batch or succeed the
entire batch (i.e. no duplicates), but this is pretty hard to implement.
Given that, what Kinesis did is probably good compromise (perhaps while we
wait for exactly once semantics :))

In addition, perhaps adding a flush() method to the producer to allow for
control over when flushes happen might be another good starting point. With
the addition of a flush, it's easier to implement a "SyncProducer" by doing
something like, flush() -> n x send() -> flush(). This doesn't guarantee
that a particular batch isn't broken into two, but with sane batch sizes
and sane record sizes, we can assume the guarantee.

On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira <gs...@cloudera.com> wrote:

> I've been thinking about that too, since both Flume and Sqoop rely on
> send(List) API of the old API.
>
> I'd like to see this API come back, but I'm debating how we'd handle
> errors. IIRC, the old API would fail an entire batch on a single
> error, which can lead to duplicates. Having N callbacks lets me retry
> / save / whatever just the messages that had issues.
>
> If messages had identifiers from the producer side, we could have the
> API call the callback with a list of message-ids and their status. But
> they don't :)
>
> Any thoughts on how you'd like it to work?
>
> Gwen
>
>
> On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota <pr...@gmail.com>
> wrote:
> > This is a great question Otis. Like Gwen said, you can accomplish Sync
> mode
> > by setting the batch size to 1. But this does highlight a shortcoming of
> > the new producer API.
> >
> > I really like the design of the new API and it has really great
> properties
> > and I'm enjoying working with it. However, once API that I think we're
> > lacking is a "batch" API. Currently, I have to iterate over a batch and
> > call .send() on each record, which returns n callbacks instead of 1
> > callback for the whole batch. This significantly complicates recovery
> logic
> > where we need to commit a batch as opposed 1 record at a time.
> >
> > Do you guys have any plans to add better semantics around batches?
> >
> > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <gs...@cloudera.com>
> wrote:
> >
> >> If I understood the code and Jay correctly - if you wait for the
> >> future it will be a similar delay to that of the old sync producer.
> >>
> >> Put another way, if you test it out and see longer delays than the
> >> sync producer had, we need to find out why and fix it.
> >>
> >> Gwen
> >>
> >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> >> <ot...@gmail.com> wrote:
> >> > Hi,
> >> >
> >> > Nope, unfortunately it can't do that.  X is a remote app, doesn't
> listen
> >> to
> >> > anything external, calls Y via HTTPS.  So X has to decide what to do
> with
> >> > its data based on Y's synchronous response.  It has to block until Y
> >> > responds.  And it wouldn't be pretty, I think, because nobody wants to
> >> run
> >> > apps that talk to remove servers and hang on to connections more than
> >> they
> >> > have to.  But perhaps that is the only way?  Or maybe the answer to
> "I'm
> >> > guessing the delay would be more or less the same as if the Producer
> was
> >> > using SYNC mode?" is YES, in which case the connection from X to Y
> would
> >> be
> >> > open for just as long as with a SYNC producer running in Y?
> >> >
> >> > Thanks,
> >> > Otis
> >> > --
> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> >> > Solr & Elasticsearch Support * http://sematext.com/
> >> >
> >> >
> >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <gs...@cloudera.com>
> >> wrote:
> >> >
> >> >> Can Y have a callback that will handle the notification to X?
> >> >> In this case, perhaps Y can be async and X can buffer the data until
> >> >> the callback triggers and says "all good" (or resend if the callback
> >> >> indicates an error)
> >> >>
> >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> >> >> <ot...@gmail.com> wrote:
> >> >> > Hi,
> >> >> >
> >> >> > Thanks for the info.  Here's the use case.  We have something up
> >> stream
> >> >> > sending data, say a log shipper called X.  It sends it to some
> remote
> >> >> > component Y.  Y is the Kafka Producer and it puts data into Kafka.
> >> But Y
> >> >> > needs to send a reply to X and tell it whether it successfully put
> all
> >> >> its
> >> >> > data into Kafka.  If it did not, Y wants to tell X to buffer data
> >> locally
> >> >> > and resend it later.
> >> >> >
> >> >> > If producer is ONLY async, Y can't easily do that.  Or maybe Y
> would
> >> just
> >> >> > need to wait for the Future to come back and only then send the
> >> response
> >> >> > back to X?  If so, I'm guessing the delay would be more or less the
> >> same
> >> >> as
> >> >> > if the Producer was using SYNC mode?
> >> >> >
> >> >> > Thanks,
> >> >> > Otis
> >> >> > --
> >> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
> Management
> >> >> > Solr & Elasticsearch Support * http://sematext.com/
> >> >> >
> >> >> >
> >> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <ja...@gmail.com>
> >> wrote:
> >> >> >
> >> >> >> Yeah as Gwen says there is no sync/async mode anymore. There is a
> new
> >> >> >> configuration which does a lot of what async did in terms of
> allowing
> >> >> >> batching:
> >> >> >>
> >> >> >> batch.size - This is the target amount of data per partition the
> >> server
> >> >> >> will attempt to batch together.
> >> >> >> linger.ms - This is the time the producer will wait for more data
> >> to be
> >> >> >> sent to better batch up writes. The default is 0 (send
> immediately).
> >> So
> >> >> if
> >> >> >> you set this to 50 ms the client will send immediately if it has
> >> already
> >> >> >> filled up its batch, otherwise it will wait to accumulate the
> number
> >> of
> >> >> >> bytes given by batch.size.
> >> >> >>
> >> >> >> To send asynchronously you do
> >> >> >>    producer.send(record)
> >> >> >> whereas to block on a response you do
> >> >> >>    producer.send(record).get();
> >> >> >> which will wait for acknowledgement from the server.
> >> >> >>
> >> >> >> One advantage of this model is that the client will do it's best
> to
> >> >> batch
> >> >> >> under the covers even if linger.ms=0. It will do this by batching
> >> any
> >> >> data
> >> >> >> that arrives while another send is in progress into a single
> >> >> >> request--giving a kind of "group commit" effect.
> >> >> >>
> >> >> >> The hope is that this will be both simpler to understand (a single
> >> api
> >> >> that
> >> >> >> always works the same) and more powerful (you always get a
> response
> >> with
> >> >> >> error and offset information whether or not you choose to use it).
> >> >> >>
> >> >> >> -Jay
> >> >> >>
> >> >> >>
> >> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <
> gshapira@cloudera.com
> >> >
> >> >> >> wrote:
> >> >> >>
> >> >> >> > If you want to emulate the old sync producer behavior, you need
> to
> >> set
> >> >> >> > the batch size to 1  (in producer config) and wait on the future
> >> you
> >> >> >> > get from Send (i.e. future.get)
> >> >> >> >
> >> >> >> > I can't think of good reasons to do so, though.
> >> >> >> >
> >> >> >> > Gwen
> >> >> >> >
> >> >> >> >
> >> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> >> >> >> > <ot...@gmail.com> wrote:
> >> >> >> > > Hi,
> >> >> >> > >
> >> >> >> > > Is the plan for New Producer to have ONLY async mode?  I'm
> asking
> >> >> >> because
> >> >> >> > > of this info from the Wiki:
> >> >> >> > >
> >> >> >> > >
> >> >> >> > >    - The producer will always attempt to batch data and will
> >> always
> >> >> >> > >    immediately return a SendResponse which acts as a Future to
> >> allow
> >> >> >> the
> >> >> >> > >    client to await the completion of the request.
> >> >> >> > >
> >> >> >> > >
> >> >> >> > > The word "always" makes me think there will be no sync mode.
> >> >> >> > >
> >> >> >> > > Thanks,
> >> >> >> > > Otis
> >> >> >> > > --
> >> >> >> > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> >> >> Management
> >> >> >> > > Solr & Elasticsearch Support * http://sematext.com/
> >> >> >> >
> >> >> >>
> >> >>
> >>
>

Re: New Producer - ONLY sync mode?

Posted by Gwen Shapira <gs...@cloudera.com>.
I've been thinking about that too, since both Flume and Sqoop rely on
send(List) API of the old API.

I'd like to see this API come back, but I'm debating how we'd handle
errors. IIRC, the old API would fail an entire batch on a single
error, which can lead to duplicates. Having N callbacks lets me retry
/ save / whatever just the messages that had issues.

If messages had identifiers from the producer side, we could have the
API call the callback with a list of message-ids and their status. But
they don't :)

Any thoughts on how you'd like it to work?

Gwen


On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota <pr...@gmail.com> wrote:
> This is a great question Otis. Like Gwen said, you can accomplish Sync mode
> by setting the batch size to 1. But this does highlight a shortcoming of
> the new producer API.
>
> I really like the design of the new API and it has really great properties
> and I'm enjoying working with it. However, once API that I think we're
> lacking is a "batch" API. Currently, I have to iterate over a batch and
> call .send() on each record, which returns n callbacks instead of 1
> callback for the whole batch. This significantly complicates recovery logic
> where we need to commit a batch as opposed 1 record at a time.
>
> Do you guys have any plans to add better semantics around batches?
>
> On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <gs...@cloudera.com> wrote:
>
>> If I understood the code and Jay correctly - if you wait for the
>> future it will be a similar delay to that of the old sync producer.
>>
>> Put another way, if you test it out and see longer delays than the
>> sync producer had, we need to find out why and fix it.
>>
>> Gwen
>>
>> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
>> <ot...@gmail.com> wrote:
>> > Hi,
>> >
>> > Nope, unfortunately it can't do that.  X is a remote app, doesn't listen
>> to
>> > anything external, calls Y via HTTPS.  So X has to decide what to do with
>> > its data based on Y's synchronous response.  It has to block until Y
>> > responds.  And it wouldn't be pretty, I think, because nobody wants to
>> run
>> > apps that talk to remove servers and hang on to connections more than
>> they
>> > have to.  But perhaps that is the only way?  Or maybe the answer to "I'm
>> > guessing the delay would be more or less the same as if the Producer was
>> > using SYNC mode?" is YES, in which case the connection from X to Y would
>> be
>> > open for just as long as with a SYNC producer running in Y?
>> >
>> > Thanks,
>> > Otis
>> > --
>> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> > Solr & Elasticsearch Support * http://sematext.com/
>> >
>> >
>> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <gs...@cloudera.com>
>> wrote:
>> >
>> >> Can Y have a callback that will handle the notification to X?
>> >> In this case, perhaps Y can be async and X can buffer the data until
>> >> the callback triggers and says "all good" (or resend if the callback
>> >> indicates an error)
>> >>
>> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
>> >> <ot...@gmail.com> wrote:
>> >> > Hi,
>> >> >
>> >> > Thanks for the info.  Here's the use case.  We have something up
>> stream
>> >> > sending data, say a log shipper called X.  It sends it to some remote
>> >> > component Y.  Y is the Kafka Producer and it puts data into Kafka.
>> But Y
>> >> > needs to send a reply to X and tell it whether it successfully put all
>> >> its
>> >> > data into Kafka.  If it did not, Y wants to tell X to buffer data
>> locally
>> >> > and resend it later.
>> >> >
>> >> > If producer is ONLY async, Y can't easily do that.  Or maybe Y would
>> just
>> >> > need to wait for the Future to come back and only then send the
>> response
>> >> > back to X?  If so, I'm guessing the delay would be more or less the
>> same
>> >> as
>> >> > if the Producer was using SYNC mode?
>> >> >
>> >> > Thanks,
>> >> > Otis
>> >> > --
>> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> >> > Solr & Elasticsearch Support * http://sematext.com/
>> >> >
>> >> >
>> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <ja...@gmail.com>
>> wrote:
>> >> >
>> >> >> Yeah as Gwen says there is no sync/async mode anymore. There is a new
>> >> >> configuration which does a lot of what async did in terms of allowing
>> >> >> batching:
>> >> >>
>> >> >> batch.size - This is the target amount of data per partition the
>> server
>> >> >> will attempt to batch together.
>> >> >> linger.ms - This is the time the producer will wait for more data
>> to be
>> >> >> sent to better batch up writes. The default is 0 (send immediately).
>> So
>> >> if
>> >> >> you set this to 50 ms the client will send immediately if it has
>> already
>> >> >> filled up its batch, otherwise it will wait to accumulate the number
>> of
>> >> >> bytes given by batch.size.
>> >> >>
>> >> >> To send asynchronously you do
>> >> >>    producer.send(record)
>> >> >> whereas to block on a response you do
>> >> >>    producer.send(record).get();
>> >> >> which will wait for acknowledgement from the server.
>> >> >>
>> >> >> One advantage of this model is that the client will do it's best to
>> >> batch
>> >> >> under the covers even if linger.ms=0. It will do this by batching
>> any
>> >> data
>> >> >> that arrives while another send is in progress into a single
>> >> >> request--giving a kind of "group commit" effect.
>> >> >>
>> >> >> The hope is that this will be both simpler to understand (a single
>> api
>> >> that
>> >> >> always works the same) and more powerful (you always get a response
>> with
>> >> >> error and offset information whether or not you choose to use it).
>> >> >>
>> >> >> -Jay
>> >> >>
>> >> >>
>> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <gshapira@cloudera.com
>> >
>> >> >> wrote:
>> >> >>
>> >> >> > If you want to emulate the old sync producer behavior, you need to
>> set
>> >> >> > the batch size to 1  (in producer config) and wait on the future
>> you
>> >> >> > get from Send (i.e. future.get)
>> >> >> >
>> >> >> > I can't think of good reasons to do so, though.
>> >> >> >
>> >> >> > Gwen
>> >> >> >
>> >> >> >
>> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
>> >> >> > <ot...@gmail.com> wrote:
>> >> >> > > Hi,
>> >> >> > >
>> >> >> > > Is the plan for New Producer to have ONLY async mode?  I'm asking
>> >> >> because
>> >> >> > > of this info from the Wiki:
>> >> >> > >
>> >> >> > >
>> >> >> > >    - The producer will always attempt to batch data and will
>> always
>> >> >> > >    immediately return a SendResponse which acts as a Future to
>> allow
>> >> >> the
>> >> >> > >    client to await the completion of the request.
>> >> >> > >
>> >> >> > >
>> >> >> > > The word "always" makes me think there will be no sync mode.
>> >> >> > >
>> >> >> > > Thanks,
>> >> >> > > Otis
>> >> >> > > --
>> >> >> > > Monitoring * Alerting * Anomaly Detection * Centralized Log
>> >> Management
>> >> >> > > Solr & Elasticsearch Support * http://sematext.com/
>> >> >> >
>> >> >>
>> >>
>>

Re: New Producer - ONLY sync mode?

Posted by Pradeep Gollakota <pr...@gmail.com>.
This is a great question Otis. Like Gwen said, you can accomplish Sync mode
by setting the batch size to 1. But this does highlight a shortcoming of
the new producer API.

I really like the design of the new API and it has really great properties
and I'm enjoying working with it. However, once API that I think we're
lacking is a "batch" API. Currently, I have to iterate over a batch and
call .send() on each record, which returns n callbacks instead of 1
callback for the whole batch. This significantly complicates recovery logic
where we need to commit a batch as opposed 1 record at a time.

Do you guys have any plans to add better semantics around batches?

On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <gs...@cloudera.com> wrote:

> If I understood the code and Jay correctly - if you wait for the
> future it will be a similar delay to that of the old sync producer.
>
> Put another way, if you test it out and see longer delays than the
> sync producer had, we need to find out why and fix it.
>
> Gwen
>
> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> <ot...@gmail.com> wrote:
> > Hi,
> >
> > Nope, unfortunately it can't do that.  X is a remote app, doesn't listen
> to
> > anything external, calls Y via HTTPS.  So X has to decide what to do with
> > its data based on Y's synchronous response.  It has to block until Y
> > responds.  And it wouldn't be pretty, I think, because nobody wants to
> run
> > apps that talk to remove servers and hang on to connections more than
> they
> > have to.  But perhaps that is the only way?  Or maybe the answer to "I'm
> > guessing the delay would be more or less the same as if the Producer was
> > using SYNC mode?" is YES, in which case the connection from X to Y would
> be
> > open for just as long as with a SYNC producer running in Y?
> >
> > Thanks,
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <gs...@cloudera.com>
> wrote:
> >
> >> Can Y have a callback that will handle the notification to X?
> >> In this case, perhaps Y can be async and X can buffer the data until
> >> the callback triggers and says "all good" (or resend if the callback
> >> indicates an error)
> >>
> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> >> <ot...@gmail.com> wrote:
> >> > Hi,
> >> >
> >> > Thanks for the info.  Here's the use case.  We have something up
> stream
> >> > sending data, say a log shipper called X.  It sends it to some remote
> >> > component Y.  Y is the Kafka Producer and it puts data into Kafka.
> But Y
> >> > needs to send a reply to X and tell it whether it successfully put all
> >> its
> >> > data into Kafka.  If it did not, Y wants to tell X to buffer data
> locally
> >> > and resend it later.
> >> >
> >> > If producer is ONLY async, Y can't easily do that.  Or maybe Y would
> just
> >> > need to wait for the Future to come back and only then send the
> response
> >> > back to X?  If so, I'm guessing the delay would be more or less the
> same
> >> as
> >> > if the Producer was using SYNC mode?
> >> >
> >> > Thanks,
> >> > Otis
> >> > --
> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> >> > Solr & Elasticsearch Support * http://sematext.com/
> >> >
> >> >
> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> >> >
> >> >> Yeah as Gwen says there is no sync/async mode anymore. There is a new
> >> >> configuration which does a lot of what async did in terms of allowing
> >> >> batching:
> >> >>
> >> >> batch.size - This is the target amount of data per partition the
> server
> >> >> will attempt to batch together.
> >> >> linger.ms - This is the time the producer will wait for more data
> to be
> >> >> sent to better batch up writes. The default is 0 (send immediately).
> So
> >> if
> >> >> you set this to 50 ms the client will send immediately if it has
> already
> >> >> filled up its batch, otherwise it will wait to accumulate the number
> of
> >> >> bytes given by batch.size.
> >> >>
> >> >> To send asynchronously you do
> >> >>    producer.send(record)
> >> >> whereas to block on a response you do
> >> >>    producer.send(record).get();
> >> >> which will wait for acknowledgement from the server.
> >> >>
> >> >> One advantage of this model is that the client will do it's best to
> >> batch
> >> >> under the covers even if linger.ms=0. It will do this by batching
> any
> >> data
> >> >> that arrives while another send is in progress into a single
> >> >> request--giving a kind of "group commit" effect.
> >> >>
> >> >> The hope is that this will be both simpler to understand (a single
> api
> >> that
> >> >> always works the same) and more powerful (you always get a response
> with
> >> >> error and offset information whether or not you choose to use it).
> >> >>
> >> >> -Jay
> >> >>
> >> >>
> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <gshapira@cloudera.com
> >
> >> >> wrote:
> >> >>
> >> >> > If you want to emulate the old sync producer behavior, you need to
> set
> >> >> > the batch size to 1  (in producer config) and wait on the future
> you
> >> >> > get from Send (i.e. future.get)
> >> >> >
> >> >> > I can't think of good reasons to do so, though.
> >> >> >
> >> >> > Gwen
> >> >> >
> >> >> >
> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> >> >> > <ot...@gmail.com> wrote:
> >> >> > > Hi,
> >> >> > >
> >> >> > > Is the plan for New Producer to have ONLY async mode?  I'm asking
> >> >> because
> >> >> > > of this info from the Wiki:
> >> >> > >
> >> >> > >
> >> >> > >    - The producer will always attempt to batch data and will
> always
> >> >> > >    immediately return a SendResponse which acts as a Future to
> allow
> >> >> the
> >> >> > >    client to await the completion of the request.
> >> >> > >
> >> >> > >
> >> >> > > The word "always" makes me think there will be no sync mode.
> >> >> > >
> >> >> > > Thanks,
> >> >> > > Otis
> >> >> > > --
> >> >> > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> >> Management
> >> >> > > Solr & Elasticsearch Support * http://sematext.com/
> >> >> >
> >> >>
> >>
>

Re: New Producer - ONLY sync mode?

Posted by Gwen Shapira <gs...@cloudera.com>.
If I understood the code and Jay correctly - if you wait for the
future it will be a similar delay to that of the old sync producer.

Put another way, if you test it out and see longer delays than the
sync producer had, we need to find out why and fix it.

Gwen

On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
<ot...@gmail.com> wrote:
> Hi,
>
> Nope, unfortunately it can't do that.  X is a remote app, doesn't listen to
> anything external, calls Y via HTTPS.  So X has to decide what to do with
> its data based on Y's synchronous response.  It has to block until Y
> responds.  And it wouldn't be pretty, I think, because nobody wants to run
> apps that talk to remove servers and hang on to connections more than they
> have to.  But perhaps that is the only way?  Or maybe the answer to "I'm
> guessing the delay would be more or less the same as if the Producer was
> using SYNC mode?" is YES, in which case the connection from X to Y would be
> open for just as long as with a SYNC producer running in Y?
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <gs...@cloudera.com> wrote:
>
>> Can Y have a callback that will handle the notification to X?
>> In this case, perhaps Y can be async and X can buffer the data until
>> the callback triggers and says "all good" (or resend if the callback
>> indicates an error)
>>
>> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
>> <ot...@gmail.com> wrote:
>> > Hi,
>> >
>> > Thanks for the info.  Here's the use case.  We have something up stream
>> > sending data, say a log shipper called X.  It sends it to some remote
>> > component Y.  Y is the Kafka Producer and it puts data into Kafka.  But Y
>> > needs to send a reply to X and tell it whether it successfully put all
>> its
>> > data into Kafka.  If it did not, Y wants to tell X to buffer data locally
>> > and resend it later.
>> >
>> > If producer is ONLY async, Y can't easily do that.  Or maybe Y would just
>> > need to wait for the Future to come back and only then send the response
>> > back to X?  If so, I'm guessing the delay would be more or less the same
>> as
>> > if the Producer was using SYNC mode?
>> >
>> > Thanks,
>> > Otis
>> > --
>> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> > Solr & Elasticsearch Support * http://sematext.com/
>> >
>> >
>> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <ja...@gmail.com> wrote:
>> >
>> >> Yeah as Gwen says there is no sync/async mode anymore. There is a new
>> >> configuration which does a lot of what async did in terms of allowing
>> >> batching:
>> >>
>> >> batch.size - This is the target amount of data per partition the server
>> >> will attempt to batch together.
>> >> linger.ms - This is the time the producer will wait for more data to be
>> >> sent to better batch up writes. The default is 0 (send immediately). So
>> if
>> >> you set this to 50 ms the client will send immediately if it has already
>> >> filled up its batch, otherwise it will wait to accumulate the number of
>> >> bytes given by batch.size.
>> >>
>> >> To send asynchronously you do
>> >>    producer.send(record)
>> >> whereas to block on a response you do
>> >>    producer.send(record).get();
>> >> which will wait for acknowledgement from the server.
>> >>
>> >> One advantage of this model is that the client will do it's best to
>> batch
>> >> under the covers even if linger.ms=0. It will do this by batching any
>> data
>> >> that arrives while another send is in progress into a single
>> >> request--giving a kind of "group commit" effect.
>> >>
>> >> The hope is that this will be both simpler to understand (a single api
>> that
>> >> always works the same) and more powerful (you always get a response with
>> >> error and offset information whether or not you choose to use it).
>> >>
>> >> -Jay
>> >>
>> >>
>> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <gs...@cloudera.com>
>> >> wrote:
>> >>
>> >> > If you want to emulate the old sync producer behavior, you need to set
>> >> > the batch size to 1  (in producer config) and wait on the future you
>> >> > get from Send (i.e. future.get)
>> >> >
>> >> > I can't think of good reasons to do so, though.
>> >> >
>> >> > Gwen
>> >> >
>> >> >
>> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
>> >> > <ot...@gmail.com> wrote:
>> >> > > Hi,
>> >> > >
>> >> > > Is the plan for New Producer to have ONLY async mode?  I'm asking
>> >> because
>> >> > > of this info from the Wiki:
>> >> > >
>> >> > >
>> >> > >    - The producer will always attempt to batch data and will always
>> >> > >    immediately return a SendResponse which acts as a Future to allow
>> >> the
>> >> > >    client to await the completion of the request.
>> >> > >
>> >> > >
>> >> > > The word "always" makes me think there will be no sync mode.
>> >> > >
>> >> > > Thanks,
>> >> > > Otis
>> >> > > --
>> >> > > Monitoring * Alerting * Anomaly Detection * Centralized Log
>> Management
>> >> > > Solr & Elasticsearch Support * http://sematext.com/
>> >> >
>> >>
>>

Re: New Producer - ONLY sync mode?

Posted by Otis Gospodnetic <ot...@gmail.com>.
Hi,

Nope, unfortunately it can't do that.  X is a remote app, doesn't listen to
anything external, calls Y via HTTPS.  So X has to decide what to do with
its data based on Y's synchronous response.  It has to block until Y
responds.  And it wouldn't be pretty, I think, because nobody wants to run
apps that talk to remove servers and hang on to connections more than they
have to.  But perhaps that is the only way?  Or maybe the answer to "I'm
guessing the delay would be more or less the same as if the Producer was
using SYNC mode?" is YES, in which case the connection from X to Y would be
open for just as long as with a SYNC producer running in Y?

Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <gs...@cloudera.com> wrote:

> Can Y have a callback that will handle the notification to X?
> In this case, perhaps Y can be async and X can buffer the data until
> the callback triggers and says "all good" (or resend if the callback
> indicates an error)
>
> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> <ot...@gmail.com> wrote:
> > Hi,
> >
> > Thanks for the info.  Here's the use case.  We have something up stream
> > sending data, say a log shipper called X.  It sends it to some remote
> > component Y.  Y is the Kafka Producer and it puts data into Kafka.  But Y
> > needs to send a reply to X and tell it whether it successfully put all
> its
> > data into Kafka.  If it did not, Y wants to tell X to buffer data locally
> > and resend it later.
> >
> > If producer is ONLY async, Y can't easily do that.  Or maybe Y would just
> > need to wait for the Future to come back and only then send the response
> > back to X?  If so, I'm guessing the delay would be more or less the same
> as
> > if the Producer was using SYNC mode?
> >
> > Thanks,
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> >> Yeah as Gwen says there is no sync/async mode anymore. There is a new
> >> configuration which does a lot of what async did in terms of allowing
> >> batching:
> >>
> >> batch.size - This is the target amount of data per partition the server
> >> will attempt to batch together.
> >> linger.ms - This is the time the producer will wait for more data to be
> >> sent to better batch up writes. The default is 0 (send immediately). So
> if
> >> you set this to 50 ms the client will send immediately if it has already
> >> filled up its batch, otherwise it will wait to accumulate the number of
> >> bytes given by batch.size.
> >>
> >> To send asynchronously you do
> >>    producer.send(record)
> >> whereas to block on a response you do
> >>    producer.send(record).get();
> >> which will wait for acknowledgement from the server.
> >>
> >> One advantage of this model is that the client will do it's best to
> batch
> >> under the covers even if linger.ms=0. It will do this by batching any
> data
> >> that arrives while another send is in progress into a single
> >> request--giving a kind of "group commit" effect.
> >>
> >> The hope is that this will be both simpler to understand (a single api
> that
> >> always works the same) and more powerful (you always get a response with
> >> error and offset information whether or not you choose to use it).
> >>
> >> -Jay
> >>
> >>
> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <gs...@cloudera.com>
> >> wrote:
> >>
> >> > If you want to emulate the old sync producer behavior, you need to set
> >> > the batch size to 1  (in producer config) and wait on the future you
> >> > get from Send (i.e. future.get)
> >> >
> >> > I can't think of good reasons to do so, though.
> >> >
> >> > Gwen
> >> >
> >> >
> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> >> > <ot...@gmail.com> wrote:
> >> > > Hi,
> >> > >
> >> > > Is the plan for New Producer to have ONLY async mode?  I'm asking
> >> because
> >> > > of this info from the Wiki:
> >> > >
> >> > >
> >> > >    - The producer will always attempt to batch data and will always
> >> > >    immediately return a SendResponse which acts as a Future to allow
> >> the
> >> > >    client to await the completion of the request.
> >> > >
> >> > >
> >> > > The word "always" makes me think there will be no sync mode.
> >> > >
> >> > > Thanks,
> >> > > Otis
> >> > > --
> >> > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> Management
> >> > > Solr & Elasticsearch Support * http://sematext.com/
> >> >
> >>
>

Re: New Producer - ONLY sync mode?

Posted by Gwen Shapira <gs...@cloudera.com>.
Can Y have a callback that will handle the notification to X?
In this case, perhaps Y can be async and X can buffer the data until
the callback triggers and says "all good" (or resend if the callback
indicates an error)

On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
<ot...@gmail.com> wrote:
> Hi,
>
> Thanks for the info.  Here's the use case.  We have something up stream
> sending data, say a log shipper called X.  It sends it to some remote
> component Y.  Y is the Kafka Producer and it puts data into Kafka.  But Y
> needs to send a reply to X and tell it whether it successfully put all its
> data into Kafka.  If it did not, Y wants to tell X to buffer data locally
> and resend it later.
>
> If producer is ONLY async, Y can't easily do that.  Or maybe Y would just
> need to wait for the Future to come back and only then send the response
> back to X?  If so, I'm guessing the delay would be more or less the same as
> if the Producer was using SYNC mode?
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <ja...@gmail.com> wrote:
>
>> Yeah as Gwen says there is no sync/async mode anymore. There is a new
>> configuration which does a lot of what async did in terms of allowing
>> batching:
>>
>> batch.size - This is the target amount of data per partition the server
>> will attempt to batch together.
>> linger.ms - This is the time the producer will wait for more data to be
>> sent to better batch up writes. The default is 0 (send immediately). So if
>> you set this to 50 ms the client will send immediately if it has already
>> filled up its batch, otherwise it will wait to accumulate the number of
>> bytes given by batch.size.
>>
>> To send asynchronously you do
>>    producer.send(record)
>> whereas to block on a response you do
>>    producer.send(record).get();
>> which will wait for acknowledgement from the server.
>>
>> One advantage of this model is that the client will do it's best to batch
>> under the covers even if linger.ms=0. It will do this by batching any data
>> that arrives while another send is in progress into a single
>> request--giving a kind of "group commit" effect.
>>
>> The hope is that this will be both simpler to understand (a single api that
>> always works the same) and more powerful (you always get a response with
>> error and offset information whether or not you choose to use it).
>>
>> -Jay
>>
>>
>> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <gs...@cloudera.com>
>> wrote:
>>
>> > If you want to emulate the old sync producer behavior, you need to set
>> > the batch size to 1  (in producer config) and wait on the future you
>> > get from Send (i.e. future.get)
>> >
>> > I can't think of good reasons to do so, though.
>> >
>> > Gwen
>> >
>> >
>> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
>> > <ot...@gmail.com> wrote:
>> > > Hi,
>> > >
>> > > Is the plan for New Producer to have ONLY async mode?  I'm asking
>> because
>> > > of this info from the Wiki:
>> > >
>> > >
>> > >    - The producer will always attempt to batch data and will always
>> > >    immediately return a SendResponse which acts as a Future to allow
>> the
>> > >    client to await the completion of the request.
>> > >
>> > >
>> > > The word "always" makes me think there will be no sync mode.
>> > >
>> > > Thanks,
>> > > Otis
>> > > --
>> > > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> > > Solr & Elasticsearch Support * http://sematext.com/
>> >
>>

Re: New Producer - ONLY sync mode?

Posted by Otis Gospodnetic <ot...@gmail.com>.
Hi,

Thanks for the info.  Here's the use case.  We have something up stream
sending data, say a log shipper called X.  It sends it to some remote
component Y.  Y is the Kafka Producer and it puts data into Kafka.  But Y
needs to send a reply to X and tell it whether it successfully put all its
data into Kafka.  If it did not, Y wants to tell X to buffer data locally
and resend it later.

If producer is ONLY async, Y can't easily do that.  Or maybe Y would just
need to wait for the Future to come back and only then send the response
back to X?  If so, I'm guessing the delay would be more or less the same as
if the Producer was using SYNC mode?

Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <ja...@gmail.com> wrote:

> Yeah as Gwen says there is no sync/async mode anymore. There is a new
> configuration which does a lot of what async did in terms of allowing
> batching:
>
> batch.size - This is the target amount of data per partition the server
> will attempt to batch together.
> linger.ms - This is the time the producer will wait for more data to be
> sent to better batch up writes. The default is 0 (send immediately). So if
> you set this to 50 ms the client will send immediately if it has already
> filled up its batch, otherwise it will wait to accumulate the number of
> bytes given by batch.size.
>
> To send asynchronously you do
>    producer.send(record)
> whereas to block on a response you do
>    producer.send(record).get();
> which will wait for acknowledgement from the server.
>
> One advantage of this model is that the client will do it's best to batch
> under the covers even if linger.ms=0. It will do this by batching any data
> that arrives while another send is in progress into a single
> request--giving a kind of "group commit" effect.
>
> The hope is that this will be both simpler to understand (a single api that
> always works the same) and more powerful (you always get a response with
> error and offset information whether or not you choose to use it).
>
> -Jay
>
>
> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <gs...@cloudera.com>
> wrote:
>
> > If you want to emulate the old sync producer behavior, you need to set
> > the batch size to 1  (in producer config) and wait on the future you
> > get from Send (i.e. future.get)
> >
> > I can't think of good reasons to do so, though.
> >
> > Gwen
> >
> >
> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> > <ot...@gmail.com> wrote:
> > > Hi,
> > >
> > > Is the plan for New Producer to have ONLY async mode?  I'm asking
> because
> > > of this info from the Wiki:
> > >
> > >
> > >    - The producer will always attempt to batch data and will always
> > >    immediately return a SendResponse which acts as a Future to allow
> the
> > >    client to await the completion of the request.
> > >
> > >
> > > The word "always" makes me think there will be no sync mode.
> > >
> > > Thanks,
> > > Otis
> > > --
> > > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > > Solr & Elasticsearch Support * http://sematext.com/
> >
>

Re: New Producer - ONLY sync mode?

Posted by Jay Kreps <ja...@gmail.com>.
Yeah as Gwen says there is no sync/async mode anymore. There is a new
configuration which does a lot of what async did in terms of allowing
batching:

batch.size - This is the target amount of data per partition the server
will attempt to batch together.
linger.ms - This is the time the producer will wait for more data to be
sent to better batch up writes. The default is 0 (send immediately). So if
you set this to 50 ms the client will send immediately if it has already
filled up its batch, otherwise it will wait to accumulate the number of
bytes given by batch.size.

To send asynchronously you do
   producer.send(record)
whereas to block on a response you do
   producer.send(record).get();
which will wait for acknowledgement from the server.

One advantage of this model is that the client will do it's best to batch
under the covers even if linger.ms=0. It will do this by batching any data
that arrives while another send is in progress into a single
request--giving a kind of "group commit" effect.

The hope is that this will be both simpler to understand (a single api that
always works the same) and more powerful (you always get a response with
error and offset information whether or not you choose to use it).

-Jay


On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <gs...@cloudera.com> wrote:

> If you want to emulate the old sync producer behavior, you need to set
> the batch size to 1  (in producer config) and wait on the future you
> get from Send (i.e. future.get)
>
> I can't think of good reasons to do so, though.
>
> Gwen
>
>
> On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> <ot...@gmail.com> wrote:
> > Hi,
> >
> > Is the plan for New Producer to have ONLY async mode?  I'm asking because
> > of this info from the Wiki:
> >
> >
> >    - The producer will always attempt to batch data and will always
> >    immediately return a SendResponse which acts as a Future to allow the
> >    client to await the completion of the request.
> >
> >
> > The word "always" makes me think there will be no sync mode.
> >
> > Thanks,
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
>

Re: New Producer - ONLY sync mode?

Posted by Gwen Shapira <gs...@cloudera.com>.
If you want to emulate the old sync producer behavior, you need to set
the batch size to 1  (in producer config) and wait on the future you
get from Send (i.e. future.get)

I can't think of good reasons to do so, though.

Gwen


On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
<ot...@gmail.com> wrote:
> Hi,
>
> Is the plan for New Producer to have ONLY async mode?  I'm asking because
> of this info from the Wiki:
>
>
>    - The producer will always attempt to batch data and will always
>    immediately return a SendResponse which acts as a Future to allow the
>    client to await the completion of the request.
>
>
> The word "always" makes me think there will be no sync mode.
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/