You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Roshan Naik <ro...@hortonworks.com> on 2015/04/27 22:19:40 UTC

New Producer API - batched sync mode support

Been evaluating the perf of old and new Produce APIs for reliable high volume streaming data movement. I do see one area of improvement that the new API could use for synchronous clients.

AFAIKT, the new API does not support batched synchronous transfers. To do synchronous send, one needs to do a future.get() after every Producer.send(). I changed the new o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this mode of operation. May not be surprising that it much slower than the async mode... hard t push it beyond 4MB/s.

The 0.8.1 Scala based producer API supported a batched sync mode via Producer.send( List<KeyedMessage> ) . My measurements show that it was able to approach (and sometimes exceed) the old async speeds... 266MB/s


Supporting this batched sync mode is very critical for streaming clients (such as flume for example) that need delivery guarantees. Although it can be done with Async mode, it requires additional book keeping as to which events are delivered and which ones are not. The programming model becomes much simpler with the batched sync mode. Client having to deal with one single future.get() helps performance greatly too as I noted.

Wanted to propose adding this as an enhancement to the new Producer API.

Re: New Producer API - batched sync mode support

Posted by Gwen Shapira <gs...@cloudera.com>.
I should have been clearer - I used Roshan's terminology in my reply.

Basically, the old producer "batch" Send() just took a sequence of
messages. I assumed Roshan is looking for something similar - which allows
for mixing messages for multiple partitions and therefore can fail for some
messages and succeed for others.

This is unrelated for MessageSet, which is for a specific partition and
indeed fails or succeeds as a whole.

For completeness - the internal RecordAccumulator component of the new
KafkaProducer does manage a separate batch for each partition, and these
batches should succeed or fail as a whole. I'm not sure I want to expose
this level of implementation detail in our API though.

Gwen


On Mon, Apr 27, 2015 at 2:36 PM, Magnus Edenhill <ma...@edenhill.se> wrote:

> Hi Gwen,
>
> can you clarify: by batch do you mean the protocol MessageSet, or some java
> client internal construct?
> If the former I was under the impression that a produced MessageSet either
> succeeds delivery or errors in its entirety on the broker.
>
> Thanks,
> Magnus
>
>
> 2015-04-27 23:05 GMT+02:00 Gwen Shapira <gs...@cloudera.com>:
>
> > Batch failure is a bit meaningless, since in the same batch, some records
> > can succeed and others may fail.
> > To implement an error handling logic (usually different than retry, since
> > the producer has a configuration controlling retries), we recommend using
> > the callback option of Send().
> >
> > Gwen
> >
> > P.S
> > Awesome seeing you here, Roshan :)
> >
> > On Mon, Apr 27, 2015 at 1:53 PM, Roshan Naik <ro...@hortonworks.com>
> > wrote:
> >
> > > The important guarantee that is needed for a client producer thread is
> > > that it requires an indication of success/failure of the batch of
> events
> > > it pushed. Essentially it needs to retry producer.send() on that same
> > > batch in case of failure. My understanding is that flush will simply
> > flush
> > > data from all threads (correct me if I am wrong).
> > >
> > > -roshan
> > >
> > >
> > >
> > > On 4/27/15 1:36 PM, "Joel Koshy" <jj...@gmail.com> wrote:
> > >
> > > >This sounds like flush:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth
> > > >od+to+the+producer+API
> > > >
> > > >which was recently implemented in trunk.
> > > >
> > > >Joel
> > > >
> > > >On Mon, Apr 27, 2015 at 08:19:40PM +0000, Roshan Naik wrote:
> > > >> Been evaluating the perf of old and new Produce APIs for reliable
> high
> > > >>volume streaming data movement. I do see one area of improvement that
> > > >>the new API could use for synchronous clients.
> > > >>
> > > >> AFAIKT, the new API does not support batched synchronous transfers.
> To
> > > >>do synchronous send, one needs to do a future.get() after every
> > > >>Producer.send(). I changed the new
> > > >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of
> this
> > > >>mode of operation. May not be surprising that it much slower than the
> > > >>async mode... hard t push it beyond 4MB/s.
> > > >>
> > > >> The 0.8.1 Scala based producer API supported a batched sync mode via
> > > >>Producer.send( List<KeyedMessage> ) . My measurements show that it
> was
> > > >>able to approach (and sometimes exceed) the old async speeds...
> 266MB/s
> > > >>
> > > >>
> > > >> Supporting this batched sync mode is very critical for streaming
> > > >>clients (such as flume for example) that need delivery guarantees.
> > > >>Although it can be done with Async mode, it requires additional book
> > > >>keeping as to which events are delivered and which ones are not. The
> > > >>programming model becomes much simpler with the batched sync mode.
> > > >>Client having to deal with one single future.get() helps performance
> > > >>greatly too as I noted.
> > > >>
> > > >> Wanted to propose adding this as an enhancement to the new Producer
> > API.
> > > >
> > >
> > >
> >
>

Re: New Producer API - batched sync mode support

Posted by Magnus Edenhill <ma...@edenhill.se>.
Hi Gwen,

can you clarify: by batch do you mean the protocol MessageSet, or some java
client internal construct?
If the former I was under the impression that a produced MessageSet either
succeeds delivery or errors in its entirety on the broker.

Thanks,
Magnus


2015-04-27 23:05 GMT+02:00 Gwen Shapira <gs...@cloudera.com>:

> Batch failure is a bit meaningless, since in the same batch, some records
> can succeed and others may fail.
> To implement an error handling logic (usually different than retry, since
> the producer has a configuration controlling retries), we recommend using
> the callback option of Send().
>
> Gwen
>
> P.S
> Awesome seeing you here, Roshan :)
>
> On Mon, Apr 27, 2015 at 1:53 PM, Roshan Naik <ro...@hortonworks.com>
> wrote:
>
> > The important guarantee that is needed for a client producer thread is
> > that it requires an indication of success/failure of the batch of events
> > it pushed. Essentially it needs to retry producer.send() on that same
> > batch in case of failure. My understanding is that flush will simply
> flush
> > data from all threads (correct me if I am wrong).
> >
> > -roshan
> >
> >
> >
> > On 4/27/15 1:36 PM, "Joel Koshy" <jj...@gmail.com> wrote:
> >
> > >This sounds like flush:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth
> > >od+to+the+producer+API
> > >
> > >which was recently implemented in trunk.
> > >
> > >Joel
> > >
> > >On Mon, Apr 27, 2015 at 08:19:40PM +0000, Roshan Naik wrote:
> > >> Been evaluating the perf of old and new Produce APIs for reliable high
> > >>volume streaming data movement. I do see one area of improvement that
> > >>the new API could use for synchronous clients.
> > >>
> > >> AFAIKT, the new API does not support batched synchronous transfers. To
> > >>do synchronous send, one needs to do a future.get() after every
> > >>Producer.send(). I changed the new
> > >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this
> > >>mode of operation. May not be surprising that it much slower than the
> > >>async mode... hard t push it beyond 4MB/s.
> > >>
> > >> The 0.8.1 Scala based producer API supported a batched sync mode via
> > >>Producer.send( List<KeyedMessage> ) . My measurements show that it was
> > >>able to approach (and sometimes exceed) the old async speeds... 266MB/s
> > >>
> > >>
> > >> Supporting this batched sync mode is very critical for streaming
> > >>clients (such as flume for example) that need delivery guarantees.
> > >>Although it can be done with Async mode, it requires additional book
> > >>keeping as to which events are delivered and which ones are not. The
> > >>programming model becomes much simpler with the batched sync mode.
> > >>Client having to deal with one single future.get() helps performance
> > >>greatly too as I noted.
> > >>
> > >> Wanted to propose adding this as an enhancement to the new Producer
> API.
> > >
> >
> >
>

Re: New Producer API - batched sync mode support

Posted by Gwen Shapira <gs...@cloudera.com>.
Batch failure is a bit meaningless, since in the same batch, some records
can succeed and others may fail.
To implement an error handling logic (usually different than retry, since
the producer has a configuration controlling retries), we recommend using
the callback option of Send().

Gwen

P.S
Awesome seeing you here, Roshan :)

On Mon, Apr 27, 2015 at 1:53 PM, Roshan Naik <ro...@hortonworks.com> wrote:

> The important guarantee that is needed for a client producer thread is
> that it requires an indication of success/failure of the batch of events
> it pushed. Essentially it needs to retry producer.send() on that same
> batch in case of failure. My understanding is that flush will simply flush
> data from all threads (correct me if I am wrong).
>
> -roshan
>
>
>
> On 4/27/15 1:36 PM, "Joel Koshy" <jj...@gmail.com> wrote:
>
> >This sounds like flush:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth
> >od+to+the+producer+API
> >
> >which was recently implemented in trunk.
> >
> >Joel
> >
> >On Mon, Apr 27, 2015 at 08:19:40PM +0000, Roshan Naik wrote:
> >> Been evaluating the perf of old and new Produce APIs for reliable high
> >>volume streaming data movement. I do see one area of improvement that
> >>the new API could use for synchronous clients.
> >>
> >> AFAIKT, the new API does not support batched synchronous transfers. To
> >>do synchronous send, one needs to do a future.get() after every
> >>Producer.send(). I changed the new
> >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this
> >>mode of operation. May not be surprising that it much slower than the
> >>async mode... hard t push it beyond 4MB/s.
> >>
> >> The 0.8.1 Scala based producer API supported a batched sync mode via
> >>Producer.send( List<KeyedMessage> ) . My measurements show that it was
> >>able to approach (and sometimes exceed) the old async speeds... 266MB/s
> >>
> >>
> >> Supporting this batched sync mode is very critical for streaming
> >>clients (such as flume for example) that need delivery guarantees.
> >>Although it can be done with Async mode, it requires additional book
> >>keeping as to which events are delivered and which ones are not. The
> >>programming model becomes much simpler with the batched sync mode.
> >>Client having to deal with one single future.get() helps performance
> >>greatly too as I noted.
> >>
> >> Wanted to propose adding this as an enhancement to the new Producer API.
> >
>
>

Re: New Producer API - batched sync mode support

Posted by Greg Lloyd <gl...@gmail.com>.
@Shapira You are correct from my perspective. We are using kafka for a
system where panels can send multiple events in a single message. The
current contract is such that all events fail or succeed as a whole. If
there is a failure the panel resends all the events. The existing producer
api supports this fine, am I getting left behind here for the sake of
brevity?

I can get behind not adding every feature people ask for but taking away
something is a different story all together.

On Wed, Apr 29, 2015 at 9:08 PM, Gwen Shapira <gs...@cloudera.com> wrote:

> I'm starting to think that the old adage "If two people say you are drunk,
> lie down" applies here :)
>
> Current API seems perfectly clear, useful and logical to everyone who wrote
> it... but we are getting multiple users asking for the old batch behavior
> back.
> One reason to get it back is to make upgrades easier - people won't need to
> rethink their existing logic if they get an API with the same behavior in
> the new producer. The other reason is what Ewen mentioned earlier - if
> everyone re-implements Joel's logic, we can provide something for that.
>
> How about getting the old batch send behavior back by adding a new API
> with:
> public void batchSend(List<ProducerRecord<K,V>>)
>
> With this implementation (mixes the old behavior with Joel's snippet):
> * send records one by one
> * flush
> * iterate on futures and "get" them
> * log a detailed message on each error
> * throw an exception if any send failed.
>
> It reproduces the old behavior - which apparently everyone really liked,
> and I don't think it is overly weird. It is very limited, but anyone who
> needs more control over his sends already have plenty of options.
>
> Thoughts?
>
> Gwen
>
>
>
>
> On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Hey guys,
> >
> > The locking argument is correct for very small records (< 50 bytes),
> > batching will help here because for small records locking becomes the big
> > bottleneck. I think these use cases are rare but not unreasonable.
> >
> > Overall I'd emphasize that the new producer is way faster at virtually
> all
> > use cases. If there is a use case where that isn't true, let's look at it
> > in a data driven way by comparing the old producer to the new producer
> and
> > looking for any areas where things got worse.
> >
> > I suspect the "reducing allocations" argument to be not a big thing. We
> do
> > a number of small per-message allocations and it didn't seem to have much
> > impact. I do think there are a couple of big producer memory
> optimizations
> > we could do by reusing the arrays in the accumulator in the serialization
> > of the request but I don't think this is one of them.
> >
> > I'd be skeptical of any api that was too weird--i.e. introduces a new way
> > of partitioning, gives back errors on a per-partition rather than per
> > message basis (given that partitioning is transparent this is really hard
> > to think about), etc. Bad apis end up causing a ton of churn and just
> don't
> > end up being a good long term commitment as we change how the underlying
> > code works over time (i.e. we hyper optimize for something then have to
> > maintain some super weird api as it becomes hyper unoptimized for the
> > client over time).
> >
> > Roshan--Flush works as you would hope, it blocks on the completion of all
> > outstanding requests. Calling get on the future for the request gives you
> > the associated error code back. Flush doesn't throw any exceptions
> because
> > waiting for requests to complete doesn't error, the individual requests
> > fail or succeed which is always reported with each request.
> >
> > Ivan--The batches you send in the scala producer today actually aren't
> > truely atomic, they just get sent in a single request.
> >
> > One tricky problem to solve when user's do batching is size limits on
> > requests. This can be very hard to manage since predicting the serialized
> > size of a bunch of java objects is not always obvious. This was
> repeatedly
> > a problem before.
> >
> > -Jay
> >
> > On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov <ib...@gmail.com>
> > wrote:
> >
> > > I must agree with @Roshan – it's hard to imagine anything more
> intuitive
> > > and easy to use for atomic batching as old sync batch api. Also, it's
> > fast.
> > > Coupled with a separate instance of producer per
> > > broker:port:topic:partition it works very well. I would be glad if it
> > finds
> > > its way into new producer api.
> > >
> > > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
> > > fetchSize must be set at least as batch bytes (before or after
> > > compression), otherwise client risks not getting any messages?
> > >
> >
>

Re: New Producer API - batched sync mode support

Posted by Jay Kreps <ja...@gmail.com>.
Gwen, I don't care what anyone says I think we are totally stlone cold
slobar. :-)

I think the only caution I would have is that in general people ask for
many things and yet the systems we all admire tend to keep their surface
area really really simple. My observation is that never in the history of
working on open source has anyone ever asked for simplicity or agitated for
removing features, but people really do value that. So I think it is worth
trying to really get down to the core problem that the api solves and avoid
adding to it unless there is a really clear case.

Here are the things I have understood:

a. The performance of batching manually could be better due to locking
around a batch. This is possible, but I think it would be good to do a
quick measurement between the new and old producer and see if this really
plays out or not and the magnitude of the performance improvement we could
achieve. There were several other perf arguments aside from locking that
seemed unlikely to me, but I think a quick measurement could clear all this
up.

b. It would be nice to have some batch-level atomicity. I agree, but I
think this is the cross-partition transaction work. Batching can't really
guarantee this (and didn't before) and I think this is one where getting
almost what you need (but not quite working) is worse than nothing cause
you can't depend on it.

c. The code for looping over responses is annoying. I think this is true,
but I think if you want to give back the offset and error per message you
kind of end up with something like the futures. You could imagine some api
that sends a list of messages and returns a Map of errors or something but
it is a little special purpose since if you don't care about the error you
don't need any special api and if you care about the offset that won't
help...so I think to do this really well we need to maybe write down the N
patterns of producer usage and see which ones we can improve with a new api.

For what it is worth I think a lot of this is just because people were used
to the scala API. However the scala api also caused endless confusion
because of the weird mixture of manual and automatic batching (what is the
difference? when to use one or the other? how do they interact? etc.).

-Jay

On Wed, Apr 29, 2015 at 6:08 PM, Gwen Shapira <gs...@cloudera.com> wrote:

> I'm starting to think that the old adage "If two people say you are drunk,
> lie down" applies here :)
>
> Current API seems perfectly clear, useful and logical to everyone who wrote
> it... but we are getting multiple users asking for the old batch behavior
> back.
> One reason to get it back is to make upgrades easier - people won't need to
> rethink their existing logic if they get an API with the same behavior in
> the new producer. The other reason is what Ewen mentioned earlier - if
> everyone re-implements Joel's logic, we can provide something for that.
>
> How about getting the old batch send behavior back by adding a new API
> with:
> public void batchSend(List<ProducerRecord<K,V>>)
>
> With this implementation (mixes the old behavior with Joel's snippet):
> * send records one by one
> * flush
> * iterate on futures and "get" them
> * log a detailed message on each error
> * throw an exception if any send failed.
>
> It reproduces the old behavior - which apparently everyone really liked,
> and I don't think it is overly weird. It is very limited, but anyone who
> needs more control over his sends already have plenty of options.
>
> Thoughts?
>
> Gwen
>
>
>
>
> On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Hey guys,
> >
> > The locking argument is correct for very small records (< 50 bytes),
> > batching will help here because for small records locking becomes the big
> > bottleneck. I think these use cases are rare but not unreasonable.
> >
> > Overall I'd emphasize that the new producer is way faster at virtually
> all
> > use cases. If there is a use case where that isn't true, let's look at it
> > in a data driven way by comparing the old producer to the new producer
> and
> > looking for any areas where things got worse.
> >
> > I suspect the "reducing allocations" argument to be not a big thing. We
> do
> > a number of small per-message allocations and it didn't seem to have much
> > impact. I do think there are a couple of big producer memory
> optimizations
> > we could do by reusing the arrays in the accumulator in the serialization
> > of the request but I don't think this is one of them.
> >
> > I'd be skeptical of any api that was too weird--i.e. introduces a new way
> > of partitioning, gives back errors on a per-partition rather than per
> > message basis (given that partitioning is transparent this is really hard
> > to think about), etc. Bad apis end up causing a ton of churn and just
> don't
> > end up being a good long term commitment as we change how the underlying
> > code works over time (i.e. we hyper optimize for something then have to
> > maintain some super weird api as it becomes hyper unoptimized for the
> > client over time).
> >
> > Roshan--Flush works as you would hope, it blocks on the completion of all
> > outstanding requests. Calling get on the future for the request gives you
> > the associated error code back. Flush doesn't throw any exceptions
> because
> > waiting for requests to complete doesn't error, the individual requests
> > fail or succeed which is always reported with each request.
> >
> > Ivan--The batches you send in the scala producer today actually aren't
> > truely atomic, they just get sent in a single request.
> >
> > One tricky problem to solve when user's do batching is size limits on
> > requests. This can be very hard to manage since predicting the serialized
> > size of a bunch of java objects is not always obvious. This was
> repeatedly
> > a problem before.
> >
> > -Jay
> >
> > On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov <ib...@gmail.com>
> > wrote:
> >
> > > I must agree with @Roshan – it's hard to imagine anything more
> intuitive
> > > and easy to use for atomic batching as old sync batch api. Also, it's
> > fast.
> > > Coupled with a separate instance of producer per
> > > broker:port:topic:partition it works very well. I would be glad if it
> > finds
> > > its way into new producer api.
> > >
> > > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
> > > fetchSize must be set at least as batch bytes (before or after
> > > compression), otherwise client risks not getting any messages?
> > >
> >
>

Re: New Producer API - batched sync mode support

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
Roshan,

If I understand correctly, you just want to make sure a number of messages
has been sent successfully. Using callback might be easier to do so.

Public class MyCallback implements Callback {
	public Set<RecordMetadata> failedSend;
	@Override
	Public void onCompletion(RecordMetadata metadata, Exception exception) {
		If (exception != null)
			failedSend.add(metadata);
	}
	
	Public boolean hasFailure() {return failedSend.size() > 0);
}

In main code, you just need to do the following:
{
MyCallback callback = new MyCallback();
For (ProducerRecord record: records)
	Producer.send();

Producer.flush();
If (callback.hasFailure())
	// do something
}

This will avoid the loop checking and provide you pretty much the same
guarantee as old producer if not better.

Jiangjie (Becket) Qin
	

On 4/30/15, 4:54 PM, "Roshan Naik" <ro...@hortonworks.com> wrote:

>@Gwen, @Ewen,
>  While atomicity of a batch is nice to have, it is not essential. I don't
>think users always expect such atomicity. Atomicity is not even guaranteed
>in many un-batched systems let alone batched systems.
>
>As long as the client gets informed about the ones that failed in the
>batch.. that would suffice.
>
>One issue with the current flush() based batch-sync implementation is that
>the client needs to iterate over *all* futures in order to scan for any
>failed messages. In the common case, it is just wasted CPU cycles as there
>won't be any failures. Would be ideal if the client is informed about only
>problematic messages.
>
>  IMO, adding a new send(batch) API may be meaningful if it can provide
>benefits beyond what user can do with a simple wrapper on existing stuff.
>For example: eliminate the CPU cycles wasted on examining results from
>successful message deliveries, or other efficiencies.
>
>
>
>@Ivan,
>   I am not certain, I am thinking that there is a possibility that the
>first few messages of the batch got accepted, but not the remainder ? At
>the same time based on some comments made earlier it appears underlying
>implementation does have an all-or-none mechanism for a batch going to a
>partition.
>For simplicity, streaming clients may not want to deal explicitly with
>partitions (and get exposed to repartitioning & leader change type issues)
>
>-roshan
>
>
>
>On 4/30/15 2:07 PM, "Gwen Shapira" <gs...@cloudera.com> wrote:
>
>>Why do we think atomicity is expected, if the old API we are emulating
>>here
>>lacks atomicity?
>>
>>I don't remember emails to the mailing list saying: "I expected this
>>batch
>>to be atomic, but instead I got duplicates when retrying after a failed
>>batch send".
>>Maybe atomicity isn't as strong requirement as we believe? That is,
>>everyone expects some duplicates during failure events and handles them
>>downstream?
>>
>>
>>
>>On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov <ib...@gmail.com>
>>wrote:
>>
>>> 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava <ew...@confluent.io>:
>>>
>>> > They aren't going to get this anyway (as Jay pointed out) given the
>>> current
>>> > broker implementation
>>> >
>>>
>>> Is it also incorrect to assume atomicity even if all messages in the
>>>batch
>>> go to the same partition?
>>>
>


Re: New Producer API - batched sync mode support

Posted by Roshan Naik <ro...@hortonworks.com>.
@Gwen, @Ewen,
  While atomicity of a batch is nice to have, it is not essential. I don't
think users always expect such atomicity. Atomicity is not even guaranteed
in many un-batched systems let alone batched systems.

As long as the client gets informed about the ones that failed in the
batch.. that would suffice.

One issue with the current flush() based batch-sync implementation is that
the client needs to iterate over *all* futures in order to scan for any
failed messages. In the common case, it is just wasted CPU cycles as there
won't be any failures. Would be ideal if the client is informed about only
problematic messages.

  IMO, adding a new send(batch) API may be meaningful if it can provide
benefits beyond what user can do with a simple wrapper on existing stuff.
For example: eliminate the CPU cycles wasted on examining results from
successful message deliveries, or other efficiencies.



@Ivan,
   I am not certain, I am thinking that there is a possibility that the
first few messages of the batch got accepted, but not the remainder ? At
the same time based on some comments made earlier it appears underlying
implementation does have an all-or-none mechanism for a batch going to a
partition.
For simplicity, streaming clients may not want to deal explicitly with
partitions (and get exposed to repartitioning & leader change type issues)

-roshan



On 4/30/15 2:07 PM, "Gwen Shapira" <gs...@cloudera.com> wrote:

>Why do we think atomicity is expected, if the old API we are emulating
>here
>lacks atomicity?
>
>I don't remember emails to the mailing list saying: "I expected this batch
>to be atomic, but instead I got duplicates when retrying after a failed
>batch send".
>Maybe atomicity isn't as strong requirement as we believe? That is,
>everyone expects some duplicates during failure events and handles them
>downstream?
>
>
>
>On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov <ib...@gmail.com>
>wrote:
>
>> 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava <ew...@confluent.io>:
>>
>> > They aren't going to get this anyway (as Jay pointed out) given the
>> current
>> > broker implementation
>> >
>>
>> Is it also incorrect to assume atomicity even if all messages in the
>>batch
>> go to the same partition?
>>


Re: New Producer API - batched sync mode support

Posted by Gwen Shapira <gs...@cloudera.com>.
Why do we think atomicity is expected, if the old API we are emulating here
lacks atomicity?

I don't remember emails to the mailing list saying: "I expected this batch
to be atomic, but instead I got duplicates when retrying after a failed
batch send".
Maybe atomicity isn't as strong requirement as we believe? That is,
everyone expects some duplicates during failure events and handles them
downstream?



On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov <ib...@gmail.com> wrote:

> 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava <ew...@confluent.io>:
>
> > They aren't going to get this anyway (as Jay pointed out) given the
> current
> > broker implementation
> >
>
> Is it also incorrect to assume atomicity even if all messages in the batch
> go to the same partition?
>

Re: New Producer API - batched sync mode support

Posted by Ivan Balashov <ib...@gmail.com>.
2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava <ew...@confluent.io>:

> They aren't going to get this anyway (as Jay pointed out) given the current
> broker implementation
>

Is it also incorrect to assume atomicity even if all messages in the batch
go to the same partition?

Re: New Producer API - batched sync mode support

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
On Wed, Apr 29, 2015 at 6:08 PM, Gwen Shapira <gs...@cloudera.com> wrote:

> I'm starting to think that the old adage "If two people say you are drunk,
> lie down" applies here :)
>
> Current API seems perfectly clear, useful and logical to everyone who wrote
> it... but we are getting multiple users asking for the old batch behavior
> back.
> One reason to get it back is to make upgrades easier - people won't need to
> rethink their existing logic if they get an API with the same behavior in
> the new producer. The other reason is what Ewen mentioned earlier - if
> everyone re-implements Joel's logic, we can provide something for that.
>
> How about getting the old batch send behavior back by adding a new API
> with:
> public void batchSend(List<ProducerRecord<K,V>>)
>
> With this implementation (mixes the old behavior with Joel's snippet):
> * send records one by one
> * flush
> * iterate on futures and "get" them
> * log a detailed message on each error
> * throw an exception if any send failed.
>
> It reproduces the old behavior - which apparently everyone really liked,
> and I don't think it is overly weird. It is very limited, but anyone who
> needs more control over his sends already have plenty of options.
>

First, I'll just say that I actually prefer a smaller API -- most wrappers
are trivial and if someone reuse them *that* often, they can always package
them into a wrapper API. Implementing everything people repeat can be a
slippery slope towards including every pattern under the sun. I'm
definitely skeptical of adding to the core API if there isn't something
that requires a specialized implementation. If it works as a trivial
wrapper that people want in Kafka proper, then it'd probably be better
placed in a contrib package or implemented as a separate utility wrapper
class that guarantees isolation from the main producer implementation.

Second, one of the reasons I mentioned alternative APIs is that I think the
behavior you'd get with this API is fairly confusing and hard to work with
given the behavior I *think* people are actually looking for. I suspect the
reason they want synchronous batch behavior is to get atomic batch writes.
They aren't going to get this anyway (as Jay pointed out) given the current
broker implementation, but you could get a close approximation if you have
max 1 in flight request and retry indefinitely (with the retries going to
the front of the queue to avoid out-of-order writes). That has the drawback
that failures never bubble up. Alternatively, you need to be able to
immediately clear the buffered records upon a failure.

I mentioned the fwrite()-like API because it lets you expose semantics like
this in a way I think can be pretty clean -- give the producer a list of
records and it will use as many as it can without a) violating the buffer
restrictions and b) ensuring that all the accepted requests will be
included in the same request to the broker. You're guaranteed that there
are no "extra" records in the accumulator if something fails, which lets
you get an error back without the possibility of message reordering,
allowing you to handle the issue however you like. (This still has a ton of
issues if you introduce client-side timeouts, which is why
https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer would
be the real solution.)

I was also sensitive to the possible performance issues because I was
recently bitten by an issue where some code was unexpectedly processing
characters one at a time with really awful performance as a result, and a
small bit of batching solved the problem :) But here, I agree with Jay that
there's enough other stuff going on under the hood that this alone isn't
likely to have a huge impact. Then again, I'd love to see some profiler
numbers from a semi-realistic workload!

On clarity of and ease of use of APIs, and coming back to the behavior I
think people are looking for -- sometimes the challenge isn't just
documenting the API you've concluded is the right one, but also explaining
why the API everyone seems to want can't work/was broken/doesn't provide
the guarantees they thought it did.


> Thoughts?
>
> Gwen
>
>
>
>
> On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Hey guys,
> >
> > The locking argument is correct for very small records (< 50 bytes),
> > batching will help here because for small records locking becomes the big
> > bottleneck. I think these use cases are rare but not unreasonable.
> >
> > Overall I'd emphasize that the new producer is way faster at virtually
> all
> > use cases. If there is a use case where that isn't true, let's look at it
> > in a data driven way by comparing the old producer to the new producer
> and
> > looking for any areas where things got worse.
> >
> > I suspect the "reducing allocations" argument to be not a big thing. We
> do
> > a number of small per-message allocations and it didn't seem to have much
> > impact. I do think there are a couple of big producer memory
> optimizations
> > we could do by reusing the arrays in the accumulator in the serialization
> > of the request but I don't think this is one of them.
> >
> > I'd be skeptical of any api that was too weird--i.e. introduces a new way
> > of partitioning, gives back errors on a per-partition rather than per
> > message basis (given that partitioning is transparent this is really hard
> > to think about), etc. Bad apis end up causing a ton of churn and just
> don't
> > end up being a good long term commitment as we change how the underlying
> > code works over time (i.e. we hyper optimize for something then have to
> > maintain some super weird api as it becomes hyper unoptimized for the
> > client over time).
> >
> > Roshan--Flush works as you would hope, it blocks on the completion of all
> > outstanding requests. Calling get on the future for the request gives you
> > the associated error code back. Flush doesn't throw any exceptions
> because
> > waiting for requests to complete doesn't error, the individual requests
> > fail or succeed which is always reported with each request.
> >
> > Ivan--The batches you send in the scala producer today actually aren't
> > truely atomic, they just get sent in a single request.
> >
> > One tricky problem to solve when user's do batching is size limits on
> > requests. This can be very hard to manage since predicting the serialized
> > size of a bunch of java objects is not always obvious. This was
> repeatedly
> > a problem before.
> >
> > -Jay
> >
> > On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov <ib...@gmail.com>
> > wrote:
> >
> > > I must agree with @Roshan – it's hard to imagine anything more
> intuitive
> > > and easy to use for atomic batching as old sync batch api. Also, it's
> > fast.
> > > Coupled with a separate instance of producer per
> > > broker:port:topic:partition it works very well. I would be glad if it
> > finds
> > > its way into new producer api.
> > >
> > > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
> > > fetchSize must be set at least as batch bytes (before or after
> > > compression), otherwise client risks not getting any messages?
> > >
> >
>



-- 
Thanks,
Ewen

Re: New Producer API - batched sync mode support

Posted by Gwen Shapira <gs...@cloudera.com>.
I'm starting to think that the old adage "If two people say you are drunk,
lie down" applies here :)

Current API seems perfectly clear, useful and logical to everyone who wrote
it... but we are getting multiple users asking for the old batch behavior
back.
One reason to get it back is to make upgrades easier - people won't need to
rethink their existing logic if they get an API with the same behavior in
the new producer. The other reason is what Ewen mentioned earlier - if
everyone re-implements Joel's logic, we can provide something for that.

How about getting the old batch send behavior back by adding a new API with:
public void batchSend(List<ProducerRecord<K,V>>)

With this implementation (mixes the old behavior with Joel's snippet):
* send records one by one
* flush
* iterate on futures and "get" them
* log a detailed message on each error
* throw an exception if any send failed.

It reproduces the old behavior - which apparently everyone really liked,
and I don't think it is overly weird. It is very limited, but anyone who
needs more control over his sends already have plenty of options.

Thoughts?

Gwen




On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps <ja...@gmail.com> wrote:

> Hey guys,
>
> The locking argument is correct for very small records (< 50 bytes),
> batching will help here because for small records locking becomes the big
> bottleneck. I think these use cases are rare but not unreasonable.
>
> Overall I'd emphasize that the new producer is way faster at virtually all
> use cases. If there is a use case where that isn't true, let's look at it
> in a data driven way by comparing the old producer to the new producer and
> looking for any areas where things got worse.
>
> I suspect the "reducing allocations" argument to be not a big thing. We do
> a number of small per-message allocations and it didn't seem to have much
> impact. I do think there are a couple of big producer memory optimizations
> we could do by reusing the arrays in the accumulator in the serialization
> of the request but I don't think this is one of them.
>
> I'd be skeptical of any api that was too weird--i.e. introduces a new way
> of partitioning, gives back errors on a per-partition rather than per
> message basis (given that partitioning is transparent this is really hard
> to think about), etc. Bad apis end up causing a ton of churn and just don't
> end up being a good long term commitment as we change how the underlying
> code works over time (i.e. we hyper optimize for something then have to
> maintain some super weird api as it becomes hyper unoptimized for the
> client over time).
>
> Roshan--Flush works as you would hope, it blocks on the completion of all
> outstanding requests. Calling get on the future for the request gives you
> the associated error code back. Flush doesn't throw any exceptions because
> waiting for requests to complete doesn't error, the individual requests
> fail or succeed which is always reported with each request.
>
> Ivan--The batches you send in the scala producer today actually aren't
> truely atomic, they just get sent in a single request.
>
> One tricky problem to solve when user's do batching is size limits on
> requests. This can be very hard to manage since predicting the serialized
> size of a bunch of java objects is not always obvious. This was repeatedly
> a problem before.
>
> -Jay
>
> On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov <ib...@gmail.com>
> wrote:
>
> > I must agree with @Roshan – it's hard to imagine anything more intuitive
> > and easy to use for atomic batching as old sync batch api. Also, it's
> fast.
> > Coupled with a separate instance of producer per
> > broker:port:topic:partition it works very well. I would be glad if it
> finds
> > its way into new producer api.
> >
> > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
> > fetchSize must be set at least as batch bytes (before or after
> > compression), otherwise client risks not getting any messages?
> >
>

Re: New Producer API - batched sync mode support

Posted by Jay Kreps <ja...@gmail.com>.
Hey guys,

The locking argument is correct for very small records (< 50 bytes),
batching will help here because for small records locking becomes the big
bottleneck. I think these use cases are rare but not unreasonable.

Overall I'd emphasize that the new producer is way faster at virtually all
use cases. If there is a use case where that isn't true, let's look at it
in a data driven way by comparing the old producer to the new producer and
looking for any areas where things got worse.

I suspect the "reducing allocations" argument to be not a big thing. We do
a number of small per-message allocations and it didn't seem to have much
impact. I do think there are a couple of big producer memory optimizations
we could do by reusing the arrays in the accumulator in the serialization
of the request but I don't think this is one of them.

I'd be skeptical of any api that was too weird--i.e. introduces a new way
of partitioning, gives back errors on a per-partition rather than per
message basis (given that partitioning is transparent this is really hard
to think about), etc. Bad apis end up causing a ton of churn and just don't
end up being a good long term commitment as we change how the underlying
code works over time (i.e. we hyper optimize for something then have to
maintain some super weird api as it becomes hyper unoptimized for the
client over time).

Roshan--Flush works as you would hope, it blocks on the completion of all
outstanding requests. Calling get on the future for the request gives you
the associated error code back. Flush doesn't throw any exceptions because
waiting for requests to complete doesn't error, the individual requests
fail or succeed which is always reported with each request.

Ivan--The batches you send in the scala producer today actually aren't
truely atomic, they just get sent in a single request.

One tricky problem to solve when user's do batching is size limits on
requests. This can be very hard to manage since predicting the serialized
size of a bunch of java objects is not always obvious. This was repeatedly
a problem before.

-Jay

On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov <ib...@gmail.com> wrote:

> I must agree with @Roshan – it's hard to imagine anything more intuitive
> and easy to use for atomic batching as old sync batch api. Also, it's fast.
> Coupled with a separate instance of producer per
> broker:port:topic:partition it works very well. I would be glad if it finds
> its way into new producer api.
>
> On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
> fetchSize must be set at least as batch bytes (before or after
> compression), otherwise client risks not getting any messages?
>

Re: New Producer API - batched sync mode support

Posted by Ivan Balashov <ib...@gmail.com>.
I must agree with @Roshan – it's hard to imagine anything more intuitive
and easy to use for atomic batching as old sync batch api. Also, it's fast.
Coupled with a separate instance of producer per
broker:port:topic:partition it works very well. I would be glad if it finds
its way into new producer api.

On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
fetchSize must be set at least as batch bytes (before or after
compression), otherwise client risks not getting any messages?

Re: New Producer API - batched sync mode support

Posted by Roshan Naik <ro...@hortonworks.com>.
@Ewen
 No I did not use compression in my measurements.


Re: New Producer API - batched sync mode support

Posted by Roshan Naik <ro...@hortonworks.com>.
@Joel,
If flush() works for this use case it may be an acceptable starting point
(although not as clean as a native batched sync). I am not as yet clear
about some aspects of flush's batch semantics and its suitability for this
mode of operation. Allow me explore it with you folks..

 1) flush() guarantees: What is the guarantees that one can expect when a
flush() call returns ?  Is it successful delivery of all events in the
buffer to broker as per the configured ack setting ?

 2) flush() error handling:  It does not throw any exception. What is the
mechanism for indicating failure in delivery of one or more events in the
batch ? Is future.get() the way to detect it ? If so, will future.get() be
applicable to all types of delivery failures (could be a network glitch or
something simpler like Kafka responding that it was not being able to
accept some events)


 2) Multithreaded Clients: The situation being that each client thread is
trying to push out a batch. flush() will pump out data from the not just
the calling thread but also from other threads. Does  Need to think
through this a bit more if that¹s ok for Multi Threaded clients.


 3) Extra synchronization and Object creation: Like Ewen pointed out, this
method definitely creates too many (Future) objects and also too much
locking/synchronization due to repeated calls to Producer.send() and
future.get(). However, if the measured perf impact of this not too much
then I guess its ok.


@Ewen,
  I am unable to find the email where I mentioned 10bytes event size. To
me 500byte to 1kB event size is more interesting. I had run measurements
with event sizes 500byte, 1k, 4k, 8k and 16k. In 0.8.1 producer_perf_test
tool, the 8k and 16k event sizes showed much better throughput in --sync
mode (throughput almost doubled with doubling of event size). The perf
tool was not using the Producer.send(list<> ) api though. I saw great
improvement when I changed it to use the Producer.send(list<> ). Sometimes
it easily exceeded the throughput async mode.



-roshan





On 4/27/15 10:32 PM, "Ewen Cheslack-Postava" <ew...@confluent.io> wrote:

>A couple of thoughts:
>
>1. @Joel I agree it's not hard to use the new API but it definitely is
>more
>verbose. If that snippet of code is being written across hundreds of
>projects, that probably means we're missing an important API. Right now
>I've only seen the one complaint, but it's worth finding out how many
>people feel like it's missing. And given that internally each of the
>returned Futures just uses the future for the entire batch, I think it's
>probably worth investigating if getting rid of millions of allocs per
>second is worth it, even if they should be in the nursery and fast to
>collect.
>
>2. For lots of small messages, there's definitely the potential for a
>performance benefit by avoiding a lot of lock acquire/release in send().
>If
>you make a first pass to organize by topic partition and then process each
>group, you lock # of partitions times rather than # of messages times. One
>major drawback I see is that it seems to make a mess of error
>handling/blocking when the RecordAccumulator runs out of space.
>
>3. @Roshan In the other thread you mentioned 10 byte messages. Is this a
>realistic payload size for you? I can imagine applications where it is
>(and
>we should support those well), it just sounds unusually small.
>
>4. I reproduced Jay's benchmark blog post awhile ago in an automated test
>(see
>https://github.com/confluentinc/muckrake/blob/master/muckrake/tests/kafka_
>benchmark_test.py).
>Here's a snippet from the output on m3.2xlarge instances that might help
>shed some light on the situation:
>INFO:_.KafkaBenchmark:Message size:
>INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.620000 MB/s)
>INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.750000 MB/s)
>INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.170000 MB/s)
>INFO:_.KafkaBenchmark: 10000: 8306.180862 rec/sec (79.210000 MB/s)
>INFO:_.KafkaBenchmark: 100000: 978.403499 rec/sec (93.310000 MB/s)
>
>That's using the single-threaded new ProducerPerformance class, so the
>m3.2xlarge's # of cores probably has little influence. There's clearly a
>sharp increase in throughput from 10 -> 100 byte messages. I recall double
>checking that the CPU was fully utilized. Note that this is with the
>acks=1
>setting that doesn't actually exist anymore, so take with a grain of salt.
>
>5. I'd suggest that there may be other APIs that give the implementation
>more flexibility but still provide batching. For example:
>* Require batched inputs to be prepartitioned so each call specifies the
>TopicPartition. Main benefit here is that the producer avoids having to do
>all the sorting, which the application may already be doing anyway.
>* How about an API similar to fwrite() where you provide a set of messages
>but it may only write some of them and tells you how many it wrote? This
>could be a clean way to expose the underlying batching that is performed
>without being a completely leaky abstraction. We could then return just a
>single future for the entire batch, we'd do minimal locking, etc. Not sure
>how to handle different TopicPartitions in the same set. I think this
>could
>be a good pattern for people who want maximally efficient ordered writes
>where errors are properly handled too.
>
>6. If I recall correctly, doesn't compression occur in a synchronized
>block, I think in the RecordAccumulator? Or maybe it was in the network
>thread? In any case, I seem to recall compression also possibly playing an
>important role in performance because it operates over a set of records
>which limits where you can run it. @Roshan, are you using compression,
>both
>in your microbenchmarks and your application?
>
>I think there's almost definitely a good case to be made for a batch API,
>but probably needs some very clear motivating use cases and perf
>measurements showing why it's not going to be feasible to accomplish with
>the current API + a few helpers to wrap it in a batch API.
>
>-Ewen
>
>
>On Mon, Apr 27, 2015 at 4:24 PM, Joel Koshy <jj...@gmail.com> wrote:
>
>>
>> >   Fine grained tracking of status of individual events is quite
>>painful
>> in
>> > contrast to simply blocking on every batch. Old style Batched-sync
>>mode
>> > has great advantages in terms of simplicity and performance.
>>
>> I may be missing something, but I'm not so convinced that it is that
>> painful/very different from the old-style.
>>
>> In the old approach, you would compose a batch (in a list of messages)
>> and do a synchronous send:
>>
>> try {
>>   producer.send(recordsToSend)
>> }
>> catch (...) {
>>   // handle (e.g., retry sending recordsToSend)
>> }
>>
>> In the new approach, you would do (something like) this:
>>
>> for (record: recordsToSend) {
>>   futureList.add(producer.send(record));
>> }
>> producer.flush();
>> for (result: futureList) {
>>   try { result.get(); }
>>   catch (...) { // handle (e.g., retry sending recordsToSend) }
>> }
>>
>>
>>
>
>
>-- 
>Thanks,
>Ewen


Re: New Producer API - batched sync mode support

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
A couple of thoughts:

1. @Joel I agree it's not hard to use the new API but it definitely is more
verbose. If that snippet of code is being written across hundreds of
projects, that probably means we're missing an important API. Right now
I've only seen the one complaint, but it's worth finding out how many
people feel like it's missing. And given that internally each of the
returned Futures just uses the future for the entire batch, I think it's
probably worth investigating if getting rid of millions of allocs per
second is worth it, even if they should be in the nursery and fast to
collect.

2. For lots of small messages, there's definitely the potential for a
performance benefit by avoiding a lot of lock acquire/release in send(). If
you make a first pass to organize by topic partition and then process each
group, you lock # of partitions times rather than # of messages times. One
major drawback I see is that it seems to make a mess of error
handling/blocking when the RecordAccumulator runs out of space.

3. @Roshan In the other thread you mentioned 10 byte messages. Is this a
realistic payload size for you? I can imagine applications where it is (and
we should support those well), it just sounds unusually small.

4. I reproduced Jay's benchmark blog post awhile ago in an automated test
(see
https://github.com/confluentinc/muckrake/blob/master/muckrake/tests/kafka_benchmark_test.py).
Here's a snippet from the output on m3.2xlarge instances that might help
shed some light on the situation:
INFO:_.KafkaBenchmark:Message size:
INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.620000 MB/s)
INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.750000 MB/s)
INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.170000 MB/s)
INFO:_.KafkaBenchmark: 10000: 8306.180862 rec/sec (79.210000 MB/s)
INFO:_.KafkaBenchmark: 100000: 978.403499 rec/sec (93.310000 MB/s)

That's using the single-threaded new ProducerPerformance class, so the
m3.2xlarge's # of cores probably has little influence. There's clearly a
sharp increase in throughput from 10 -> 100 byte messages. I recall double
checking that the CPU was fully utilized. Note that this is with the acks=1
setting that doesn't actually exist anymore, so take with a grain of salt.

5. I'd suggest that there may be other APIs that give the implementation
more flexibility but still provide batching. For example:
* Require batched inputs to be prepartitioned so each call specifies the
TopicPartition. Main benefit here is that the producer avoids having to do
all the sorting, which the application may already be doing anyway.
* How about an API similar to fwrite() where you provide a set of messages
but it may only write some of them and tells you how many it wrote? This
could be a clean way to expose the underlying batching that is performed
without being a completely leaky abstraction. We could then return just a
single future for the entire batch, we'd do minimal locking, etc. Not sure
how to handle different TopicPartitions in the same set. I think this could
be a good pattern for people who want maximally efficient ordered writes
where errors are properly handled too.

6. If I recall correctly, doesn't compression occur in a synchronized
block, I think in the RecordAccumulator? Or maybe it was in the network
thread? In any case, I seem to recall compression also possibly playing an
important role in performance because it operates over a set of records
which limits where you can run it. @Roshan, are you using compression, both
in your microbenchmarks and your application?

I think there's almost definitely a good case to be made for a batch API,
but probably needs some very clear motivating use cases and perf
measurements showing why it's not going to be feasible to accomplish with
the current API + a few helpers to wrap it in a batch API.

-Ewen


On Mon, Apr 27, 2015 at 4:24 PM, Joel Koshy <jj...@gmail.com> wrote:

>
> >   Fine grained tracking of status of individual events is quite painful
> in
> > contrast to simply blocking on every batch. Old style Batched-sync mode
> > has great advantages in terms of simplicity and performance.
>
> I may be missing something, but I'm not so convinced that it is that
> painful/very different from the old-style.
>
> In the old approach, you would compose a batch (in a list of messages)
> and do a synchronous send:
>
> try {
>   producer.send(recordsToSend)
> }
> catch (...) {
>   // handle (e.g., retry sending recordsToSend)
> }
>
> In the new approach, you would do (something like) this:
>
> for (record: recordsToSend) {
>   futureList.add(producer.send(record));
> }
> producer.flush();
> for (result: futureList) {
>   try { result.get(); }
>   catch (...) { // handle (e.g., retry sending recordsToSend) }
> }
>
>
>


-- 
Thanks,
Ewen

Re: New Producer API - batched sync mode support

Posted by Joel Koshy <jj...@gmail.com>.
>   Fine grained tracking of status of individual events is quite painful in
> contrast to simply blocking on every batch. Old style Batched-sync mode
> has great advantages in terms of simplicity and performance.

I may be missing something, but I'm not so convinced that it is that
painful/very different from the old-style.

In the old approach, you would compose a batch (in a list of messages)
and do a synchronous send:

try {
  producer.send(recordsToSend)
}
catch (...) {
  // handle (e.g., retry sending recordsToSend)
}

In the new approach, you would do (something like) this:

for (record: recordsToSend) {
  futureList.add(producer.send(record));
}
producer.flush();
for (result: futureList) {
  try { result.get(); }
  catch (...) { // handle (e.g., retry sending recordsToSend) }
}



Re: New Producer API - batched sync mode support

Posted by Roshan Naik <ro...@hortonworks.com>.

On 4/27/15 2:59 PM, "Gwen Shapira" <gs...@cloudera.com> wrote:

>@Roshan - if the data was already written to Kafka, your approach will
>generate LOTS of duplicates. I'm not convinced its ideal.


Only if the delivery failure rate is very high (i.e. short lived but very
frequent).  This batch semantics is not uncommon
also since Kafka had sync-batching in the past, it would not be new to
Kafka either.

But, like I had mentioned, there is an alternative to mitigate this
duplication... Return value could indicate failed messages.



Wanted to add.. That these failures typically assume that we got an
"error" back from Kafka Broker. There are other modes of failure (like
network glitches) which will means you only get some (potentially not very
informative) exception.



>
>What's wrong with callbacks?


The complexities of fine grained tracking  .. that I described in prev
email.



Re: New Producer API - batched sync mode support

Posted by Gwen Shapira <gs...@cloudera.com>.
@Roshan - if the data was already written to Kafka, your approach will
generate LOTS of duplicates. I'm not convinced its ideal.

What's wrong with callbacks?

On Mon, Apr 27, 2015 at 2:53 PM, Roshan Naik <ro...@hortonworks.com> wrote:

> @Gwen
>
>  - A failure in delivery of one or more events in the batch (typical Flume
> case) is considered a failure of the entire batch and the client
> redelivers the entire batch.
>  - If clients want more fine grained control, alternative option is to
> indicate which events failed in the return value of  producer.send(list<>)
>
>
> @Joel
>   Fine grained tracking of status of individual events is quite painful in
> contrast to simply blocking on every batch. Old style Batched-sync mode
> has great advantages in terms of simplicity and performance.
>   Imagine a simple use case of client simply reading a directory of log
> files and splitting them into log messages/events and pushing them through
> kafka. Becomes more complex when all this tracking data needs to be
> persisted to accommodate for client restarts/crashes. Tracking a simple
> current line number and file name is easy to programming with and persist
> to accommodateŠ as opposed start/end position of each log message in the
> file.
>
>
> -roshan
>
>
>
>
>
> On 4/27/15 2:07 PM, "Joel Koshy" <jj...@gmail.com> wrote:
>
> >As long as you retain the returned futures somewhere, you can always
> >iterate over the futures after the flush completes and check for
> >success/failure. Would that work for you?
> >
> >On Mon, Apr 27, 2015 at 08:53:36PM +0000, Roshan Naik wrote:
> >> The important guarantee that is needed for a client producer thread is
> >> that it requires an indication of success/failure of the batch of events
> >> it pushed. Essentially it needs to retry producer.send() on that same
> >> batch in case of failure. My understanding is that flush will simply
> >>flush
> >> data from all threads (correct me if I am wrong).
> >>
> >> -roshan
> >>
> >>
> >>
> >> On 4/27/15 1:36 PM, "Joel Koshy" <jj...@gmail.com> wrote:
> >>
> >> >This sounds like flush:
> >>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+me
> >>>th
> >> >od+to+the+producer+API
> >> >
> >> >which was recently implemented in trunk.
> >> >
> >> >Joel
> >> >
> >> >On Mon, Apr 27, 2015 at 08:19:40PM +0000, Roshan Naik wrote:
> >> >> Been evaluating the perf of old and new Produce APIs for reliable
> >>high
> >> >>volume streaming data movement. I do see one area of improvement that
> >> >>the new API could use for synchronous clients.
> >> >>
> >> >> AFAIKT, the new API does not support batched synchronous transfers.
> >>To
> >> >>do synchronous send, one needs to do a future.get() after every
> >> >>Producer.send(). I changed the new
> >> >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this
> >> >>mode of operation. May not be surprising that it much slower than the
> >> >>async mode... hard t push it beyond 4MB/s.
> >> >>
> >> >> The 0.8.1 Scala based producer API supported a batched sync mode via
> >> >>Producer.send( List<KeyedMessage> ) . My measurements show that it was
> >> >>able to approach (and sometimes exceed) the old async speeds...
> >>266MB/s
> >> >>
> >> >>
> >> >> Supporting this batched sync mode is very critical for streaming
> >> >>clients (such as flume for example) that need delivery guarantees.
> >> >>Although it can be done with Async mode, it requires additional book
> >> >>keeping as to which events are delivered and which ones are not. The
> >> >>programming model becomes much simpler with the batched sync mode.
> >> >>Client having to deal with one single future.get() helps performance
> >> >>greatly too as I noted.
> >> >>
> >> >> Wanted to propose adding this as an enhancement to the new Producer
> >>API.
> >> >
> >>
> >
>
>

Re: New Producer API - batched sync mode support

Posted by Roshan Naik <ro...@hortonworks.com>.
@Gwen

 - A failure in delivery of one or more events in the batch (typical Flume
case) is considered a failure of the entire batch and the client
redelivers the entire batch.
 - If clients want more fine grained control, alternative option is to
indicate which events failed in the return value of  producer.send(list<>)


@Joel
  Fine grained tracking of status of individual events is quite painful in
contrast to simply blocking on every batch. Old style Batched-sync mode
has great advantages in terms of simplicity and performance.
  Imagine a simple use case of client simply reading a directory of log
files and splitting them into log messages/events and pushing them through
kafka. Becomes more complex when all this tracking data needs to be
persisted to accommodate for client restarts/crashes. Tracking a simple
current line number and file name is easy to programming with and persist
to accommodateŠ as opposed start/end position of each log message in the
file. 


-roshan





On 4/27/15 2:07 PM, "Joel Koshy" <jj...@gmail.com> wrote:

>As long as you retain the returned futures somewhere, you can always
>iterate over the futures after the flush completes and check for
>success/failure. Would that work for you?
>
>On Mon, Apr 27, 2015 at 08:53:36PM +0000, Roshan Naik wrote:
>> The important guarantee that is needed for a client producer thread is
>> that it requires an indication of success/failure of the batch of events
>> it pushed. Essentially it needs to retry producer.send() on that same
>> batch in case of failure. My understanding is that flush will simply
>>flush
>> data from all threads (correct me if I am wrong).
>> 
>> -roshan
>> 
>> 
>> 
>> On 4/27/15 1:36 PM, "Joel Koshy" <jj...@gmail.com> wrote:
>> 
>> >This sounds like flush:
>> 
>>>https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+me
>>>th
>> >od+to+the+producer+API
>> >
>> >which was recently implemented in trunk.
>> >
>> >Joel
>> >
>> >On Mon, Apr 27, 2015 at 08:19:40PM +0000, Roshan Naik wrote:
>> >> Been evaluating the perf of old and new Produce APIs for reliable
>>high
>> >>volume streaming data movement. I do see one area of improvement that
>> >>the new API could use for synchronous clients.
>> >> 
>> >> AFAIKT, the new API does not support batched synchronous transfers.
>>To
>> >>do synchronous send, one needs to do a future.get() after every
>> >>Producer.send(). I changed the new
>> >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this
>> >>mode of operation. May not be surprising that it much slower than the
>> >>async mode... hard t push it beyond 4MB/s.
>> >> 
>> >> The 0.8.1 Scala based producer API supported a batched sync mode via
>> >>Producer.send( List<KeyedMessage> ) . My measurements show that it was
>> >>able to approach (and sometimes exceed) the old async speeds...
>>266MB/s
>> >> 
>> >> 
>> >> Supporting this batched sync mode is very critical for streaming
>> >>clients (such as flume for example) that need delivery guarantees.
>> >>Although it can be done with Async mode, it requires additional book
>> >>keeping as to which events are delivered and which ones are not. The
>> >>programming model becomes much simpler with the batched sync mode.
>> >>Client having to deal with one single future.get() helps performance
>> >>greatly too as I noted.
>> >> 
>> >> Wanted to propose adding this as an enhancement to the new Producer
>>API.
>> >
>> 
>


Re: New Producer API - batched sync mode support

Posted by Joel Koshy <jj...@gmail.com>.
As long as you retain the returned futures somewhere, you can always
iterate over the futures after the flush completes and check for
success/failure. Would that work for you?

On Mon, Apr 27, 2015 at 08:53:36PM +0000, Roshan Naik wrote:
> The important guarantee that is needed for a client producer thread is
> that it requires an indication of success/failure of the batch of events
> it pushed. Essentially it needs to retry producer.send() on that same
> batch in case of failure. My understanding is that flush will simply flush
> data from all threads (correct me if I am wrong).
> 
> -roshan
> 
> 
> 
> On 4/27/15 1:36 PM, "Joel Koshy" <jj...@gmail.com> wrote:
> 
> >This sounds like flush:
> >https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth
> >od+to+the+producer+API
> >
> >which was recently implemented in trunk.
> >
> >Joel
> >
> >On Mon, Apr 27, 2015 at 08:19:40PM +0000, Roshan Naik wrote:
> >> Been evaluating the perf of old and new Produce APIs for reliable high
> >>volume streaming data movement. I do see one area of improvement that
> >>the new API could use for synchronous clients.
> >> 
> >> AFAIKT, the new API does not support batched synchronous transfers. To
> >>do synchronous send, one needs to do a future.get() after every
> >>Producer.send(). I changed the new
> >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this
> >>mode of operation. May not be surprising that it much slower than the
> >>async mode... hard t push it beyond 4MB/s.
> >> 
> >> The 0.8.1 Scala based producer API supported a batched sync mode via
> >>Producer.send( List<KeyedMessage> ) . My measurements show that it was
> >>able to approach (and sometimes exceed) the old async speeds... 266MB/s
> >> 
> >> 
> >> Supporting this batched sync mode is very critical for streaming
> >>clients (such as flume for example) that need delivery guarantees.
> >>Although it can be done with Async mode, it requires additional book
> >>keeping as to which events are delivered and which ones are not. The
> >>programming model becomes much simpler with the batched sync mode.
> >>Client having to deal with one single future.get() helps performance
> >>greatly too as I noted.
> >> 
> >> Wanted to propose adding this as an enhancement to the new Producer API.
> >
> 


Re: New Producer API - batched sync mode support

Posted by Roshan Naik <ro...@hortonworks.com>.
The important guarantee that is needed for a client producer thread is
that it requires an indication of success/failure of the batch of events
it pushed. Essentially it needs to retry producer.send() on that same
batch in case of failure. My understanding is that flush will simply flush
data from all threads (correct me if I am wrong).

-roshan



On 4/27/15 1:36 PM, "Joel Koshy" <jj...@gmail.com> wrote:

>This sounds like flush:
>https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth
>od+to+the+producer+API
>
>which was recently implemented in trunk.
>
>Joel
>
>On Mon, Apr 27, 2015 at 08:19:40PM +0000, Roshan Naik wrote:
>> Been evaluating the perf of old and new Produce APIs for reliable high
>>volume streaming data movement. I do see one area of improvement that
>>the new API could use for synchronous clients.
>> 
>> AFAIKT, the new API does not support batched synchronous transfers. To
>>do synchronous send, one needs to do a future.get() after every
>>Producer.send(). I changed the new
>>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this
>>mode of operation. May not be surprising that it much slower than the
>>async mode... hard t push it beyond 4MB/s.
>> 
>> The 0.8.1 Scala based producer API supported a batched sync mode via
>>Producer.send( List<KeyedMessage> ) . My measurements show that it was
>>able to approach (and sometimes exceed) the old async speeds... 266MB/s
>> 
>> 
>> Supporting this batched sync mode is very critical for streaming
>>clients (such as flume for example) that need delivery guarantees.
>>Although it can be done with Async mode, it requires additional book
>>keeping as to which events are delivered and which ones are not. The
>>programming model becomes much simpler with the batched sync mode.
>>Client having to deal with one single future.get() helps performance
>>greatly too as I noted.
>> 
>> Wanted to propose adding this as an enhancement to the new Producer API.
>


Re: New Producer API - batched sync mode support

Posted by Joel Koshy <jj...@gmail.com>.
This sounds like flush:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API

which was recently implemented in trunk.

Joel

On Mon, Apr 27, 2015 at 08:19:40PM +0000, Roshan Naik wrote:
> Been evaluating the perf of old and new Produce APIs for reliable high volume streaming data movement. I do see one area of improvement that the new API could use for synchronous clients.
> 
> AFAIKT, the new API does not support batched synchronous transfers. To do synchronous send, one needs to do a future.get() after every Producer.send(). I changed the new o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this mode of operation. May not be surprising that it much slower than the async mode... hard t push it beyond 4MB/s.
> 
> The 0.8.1 Scala based producer API supported a batched sync mode via Producer.send( List<KeyedMessage> ) . My measurements show that it was able to approach (and sometimes exceed) the old async speeds... 266MB/s
> 
> 
> Supporting this batched sync mode is very critical for streaming clients (such as flume for example) that need delivery guarantees. Although it can be done with Async mode, it requires additional book keeping as to which events are delivered and which ones are not. The programming model becomes much simpler with the batched sync mode. Client having to deal with one single future.get() helps performance greatly too as I noted.
> 
> Wanted to propose adding this as an enhancement to the new Producer API.