You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Paul Sutter <ps...@quantbench.com> on 2011/07/19 03:41:40 UTC

Kafka questions

Kafka looks like an exciting project, thanks for opening it up.

I have a few questions:

1. Are checksums end to end (ie, created by the producer and checked by the
consumer)? or are they only used to confirm buffercache behavior on disk as
mentioned in the documentation? Bit errors occur vastly more often than most
people assume, often because of device driver bugs. TCP only detects 1 error
in 65536, so errors can flow through (if you like I can send links to papers
describing the need for checksums everywhere).

2. The consumer has a pretty solid mechanism to ensure it hasnt missed any
messages (i like the design by the way), but how does the producer know that
all of its messages have been stored? (no apparent message id on that side
since the message id isnt known until the message is written to the file).
I'm especially curious how failover/replication could be implemented and I'm
thinking that acks on the publisher side may help)

3. Has the consumer's flow control been tested over high bandwidth*delay
links? (what bandwidth can you get from a London consumer of an SF cluster?)

4. What kind of performance do you get if you set the producer's message
delay to zero? (ie, is there a separate system call for each message? or do
you manage to aggregate messages into a smaller number of system calls even
with a delay of 0?)

5. Have you considered using a library like zeromq for the messaging layer
instead of rolling your own? (zeromq will handle #4 cleanly at millions of
messages per second and has clients in 20 languages)

6. Do you have any plans to support intermediate processing elements the way
Flume supports?

7. The docs mention that new versions will only be released after they are
in production at LinkedIn? Does that mean that the latest version of the
source code is hidden at LinkedIn and contributors would have to throw
patches over the wall and wait months to get the integrated product?

Thanks!

Re: Kafka questions

Posted by Jun Rao <ju...@gmail.com>.
Hi, Paul,

We'd love to get help from you if Kafka fits your need. See my inlined reply
below. It seems that you care about latency and reliability more than
throughput. Are you dealing with high-volume events here?

Jun

On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <ps...@quantbench.com> wrote:

> Jun
>
> Thanks for your answers and the link to the paper - that helps a lot,
> especially the comment in the paper that 10 second end to end latency is
> good enough for your intended use case.
>
> We're looking for much lower latencies, and the basic design of Kafka feels
> like it should support latencies in milliseconds with a few changes. We're
> either going to build our own system, or help develop something that
> already
> exists, so please take my comments in the constructive way they're intended
> (I realize the changes I'm suggesting are outside your intended use case,
> but if you're interested we may be able to provide a very capable developer
> to help with the work, assuming we choose kafka over the other zillion
> streaming systems that are coming out of the woodwork).
>
> a. *Producer "queue.time"* - In my question 4 below, I was referring to the
> producer queue time.  With a default value of 5 seconds, that accounts for
> half your end to end latency. A system like zeromq is optimized to write
> data immediately without delay, but in such a way to minimizes the number
> of
> system calls required during high throughput messages. Zeromq is no
> nirvana,
> but it has a number of nice properties.
>

One can reduce queue.time to reduce latency. Our producer api also allows a
user to send a message synchronously. This will increase the number of RPC
calls. Whether this becomes a problem depends on your data volume.


>
> b. *Broker "log.default.flush.interval.ms"* - The default value of 3
> seconds
> appears to be another significant source of latency in the system, assuming
> that clients are unable to access data until it has been flushed. Since you
> have wisely chosen to take advantage of the buffer cache as part of your
> system design, it seems that you could remove this latency completely by
> memory mapping the partitions and memcpying each message as it arrives.
> With
> the right IPC mechanism clients could have immediate access to new
> messages.
>
> Currently, we only expose flushed messages to the consumer. I would like to
have a continuous flusher, which flushes dirty pages to disk asynchronously
as fast as it can. That way, a user doesn't have to set the flush intervals
manually. Also, in our replication design, we will allow a message to be
exposed to the consumer as soon as the message hits multiple brokers in
memory.


> c. *Batching, sync vs async, replication, and auditing*. Its understandable
> that you've chosen a a forensic approach to producer reliability (after the
> fact auditing), but when you implement replication it would be really nice
> to revise the producer protocol mechanisms. If you used a streaming
> mechanism with producer offsets and ACKs, you could ensure reliable
> delivery
> of producer streams to multiple brokers without the need to choose a "batch
> size" or "queue.time". This could also give you active/active failover of
> brokers. This may also help in the WAN case (my question 3 below) because
> you will be able to adaptively stuff more and more data through the fiber
> for high bandwidth*delay links without having to choose a large "batch
> size"
> nor have the additional latency that entails. Oh, and it will help you deal
> with CRC errors once you start checking for them.
>
> We do plan to add producer side ACK, likely as part of the replication
work. Our current replication design is intended for a cluster of machines
within the same data center. Across DC, we still plan to use asynchronously
replication through embedded consumers. However, even with async
replication, I believe that we can achieve latency much better than 10 secs,
by shrinking the buffering time, flush interval, etc. Since a round trip RPC
across DC can be 100ms itself, it would be hard to keep the latency at the
millisecs level. A couple of seconds delay should be achievable.

c. *Performance measurements* - I'd like to make a suggestion for your
> performance measurements. Your benchmarks measure throughput, but a
> throughput number is meaningless without an associated "% cpu time".
> Ideally
> all measurements achieve wire speed (100MB/sec) at 0% CPU (since, after
> all,
> this is plumbing and we assume the cores in the system should have capacity
> set aside for useful work too). Obviously nobody ever achieves this, but by
> measuring it one can raise the bar in terms of optimization.
>
> That's a good suggestion. The CPU usage for Kafka is really low (typically
less than 10%) and our performance is really bound by disk I/O.


> Paul
>
> ps. Just for background, I am the cofounder at Quantcast where we process
> 3.5PB of data per day. These questions are related to my new startup
> Quantbench which will deal with financial market data where you dont want
> any latency at all. And WAN issues are a big deal too. Incidentally, I was
> also founder of Orbital Data which was a WAN optimization company so I've
> done a lot of work with protocols over long distances.
>
> On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Paul,
> >
> > Excellent questions. See my answers below. Thanks,
> >
> > On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <ps...@quantbench.com>
> > wrote:
> >
> > > Kafka looks like an exciting project, thanks for opening it up.
> > >
> > > I have a few questions:
> > >
> > > 1. Are checksums end to end (ie, created by the producer and checked by
> > the
> > > consumer)? or are they only used to confirm buffercache behavior on
> disk
> > as
> > > mentioned in the documentation? Bit errors occur vastly more often than
> > > most
> > > people assume, often because of device driver bugs. TCP only detects 1
> > > error
> > > in 65536, so errors can flow through (if you like I can send links to
> > > papers
> > > describing the need for checksums everywhere).
> > >
> >
> > Checksum is generated at the producer and propagated to the broker and
> > eventually the consumer. Currently, we only validate the checksum at the
> > broker. We could further validate it at the consumer in the future.
> >
> > >
> > > 2. The consumer has a pretty solid mechanism to ensure it hasnt missed
> > any
> > > messages (i like the design by the way), but how does the producer know
> > > that
> > > all of its messages have been stored? (no apparent message id on that
> > side
> > > since the message id isnt known until the message is written to the
> > file).
> > > I'm especially curious how failover/replication could be implemented
> and
> > > I'm
> > > thinking that acks on the publisher side may help)
> > >
> >
> > The producer side auditing is not built-in. At LinkedIn, we do that by
> > generating an auditing event periodically in the eventhandler of the
> async
> > producer. The auditing event contains the number of events produced in a
> > configured window (e.g., 10 minutes) and are sent to a separate topic.
> The
> > consumer can read the actual data and the auditing event and compare the
> > counts. See our paper (
> >
> >
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> > )
> > for some more details.
> >
> >
> > >
> > > 3. Has the consumer's flow control been tested over high
> bandwidth*delay
> > > links? (what bandwidth can you get from a London consumer of an SF
> > > cluster?)
> > >
> > > Yes, we actually replicate kafka data across data centers, using an
> > embedded consumer in a broker. Again, there is a bit more info on this in
> > our paper.
> >
> >
> > > 4. What kind of performance do you get if you set the producer's
> message
> > > delay to zero? (ie, is there a separate system call for each message?
> or
> > do
> > > you manage to aggregate messages into a smaller number of system calls
> > even
> > > with a delay of 0?)
> > >
> > > I assume that you are referring to the flush interval. One can
> configure
> > to
> > flush every message to disk. This will slow down the throughput
> > significantly.
> >
> >
> > > 5. Have you considered using a library like zeromq for the messaging
> > layer
> > > instead of rolling your own? (zeromq will handle #4 cleanly at millions
> > of
> > > messages per second and has clients in 20 languages)
> > >
> > > No. Our proprietary format allows us to support things like compression
> > in
> > the future. However, we can definitely look into the zeromq format. Is
> > their
> > messaging layer easily extractable?
> >
> >
> > > 6. Do you have any plans to support intermediate processing elements
> the
> > > way
> > > Flume supports?
> > >
> > > For now, we are just focusing on getting the raw messaging layer solid.
> > We
> > have worked a bit on streaming processing and will look into that again
> in
> > the future.
> >
> >
> > > 7. The docs mention that new versions will only be released after they
> > are
> > > in production at LinkedIn? Does that mean that the latest version of
> the
> > > source code is hidden at LinkedIn and contributors would have to throw
> > > patches over the wall and wait months to get the integrated product?
> > >
> > > What we ran at LinkedIn is the same version in open source and there is
> > no
> > internal repository of Kafka at LinkedIn. We plan to maintain that in the
> > future.
> >
> >
> > > Thanks!
> > >
> >
>

Re: Kafka questions

Posted by "Alan D. Cabrera" <li...@toolazydogs.com>.
Good examples.  Thanks for pointing them out.  Now that I see that PDFs are not a barrier to open discussions, it's up to you guys.

Thanks again!


Regards,
Alan


On Jul 20, 2011, at 11:17 AM, Jakob Homan wrote:

> I'm not meaning to push back; just curious as to the drawbacks of PDFs
> versus wikis.  I was surprised to see PDF-backed design docs described
> as "very bad" when I've seen this approach work well in multiple
> projects (e.g. HDFS-265, MAPREDUCE-326, HBASE-3857, ZOOKEEPER-1016,
> BOOKKEEPER-11, HDFS-1073, HIVE-1555).  Posting PDFs and the
> change-driver incorporating feedback until consensus emerges seems
> like a more natural counterpart to our SOP of posting patches and the
> coder incorporating feedback until +1 is given.
> -jg
> 
> 
> On Wed, Jul 20, 2011 at 10:41 AM, Alan D. Cabrera <li...@toolazydogs.com> wrote:
>> Yeah, that's the point of a wiki, sharing.  One person should not own a design doc.  Frankly, I don't understand the push back for such a simple document.
>> 
>> 
>> Regards,
>> Alan
>> 
>> 
>> On Jul 20, 2011, at 10:34 AM, Jakob Homan wrote:
>> 
>>> Doesn't need to be, but could be.  It's usually up to the person
>>> proposing the change/driving the discussion to create new versions of
>>> the PDF.  In my experience, when people attached the Word doc, others
>>> would complain that they didn't have Word, and when people attached,
>>> e.g., the laTex document people would have complain they didn't know
>>> ancient Egyptian...
>>> -jg
>>> 
>>> On Wed, Jul 20, 2011 at 10:19 AM, Alan D. Cabrera <li...@toolazydogs.com> wrote:
>>>> Are you saying that the source document for the PDF is also attached to the issue?  I don't see it in KAFKA-50.
>>>> 
>>>> 
>>>> Regards,
>>>> Alan
>>>> 
>>>> On Jul 20, 2011, at 10:11 AM, Jakob Homan wrote:
>>>> 
>>>>> I don't have anything against wikis - they're great for information
>>>>> that changes more frequently than releases are made and should be
>>>>> user-facing (configuration, FAQs, etc).
>>>>> 
>>>>> For large technical changes, like the one currently being propsosed,
>>>>> the PDF isn't static, but will have several versions posted.  The
>>>>> whole discussion is: PDF version 0, then comments on that PDF, then
>>>>> PDFv1, then more discussions until eventually the discussion turns
>>>>> into +1s and the final version of the PDF is attached.  The JIRA does
>>>>> a good job of chronicling the discussion that wiki change logs
>>>>> doesn't.  JIRA just seems like a more natural forum to spur
>>>>> discussion.
>>>>> 
>>>>> Also, having the person driving the change updating the document tends
>>>>> to keep the discussion on track and making progress.
>>>>> 
>>>>> Finally, new or less senior members of the community may be reluctant
>>>>> to edit a semi-official project document like a wiki, but hopefully
>>>>> will be willing to join in the discussion on JIRA.
>>>>> -jg
>>>>> 
>>>>> 
>>>>> On Wed, Jul 20, 2011 at 9:56 AM, Alan D. Cabrera <li...@toolazydogs.com> wrote:
>>>>>> 
>>>>>> On Jul 20, 2011, at 9:51 AM, Jakob Homan wrote:
>>>>>> 
>>>>>>>>> and then just comment and iterate over there. Is that not the preferred way?
>>>>>>>> 
>>>>>>>> No, that's very bad.  There's no way that others can participate and modify the design.
>>>>>>>> 
>>>>>>> 
>>>>>>> How so?  The documentation is online and the discussion is online and
>>>>>>> recorded for posterity. The only barrier to entry to the discussion is
>>>>>>> setting up a JIRA account.
>>>>>> 
>>>>>> The design document should be open to the community to edit.  Not a frozen PDF document.  I'll turn the question around.  What problem do you see storing the document in a wiki format?
>>>>>> 
>>>>>> 
>>>>>> Regards,
>>>>>> Alan
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 


Re: Kafka questions

Posted by Jakob Homan <jg...@gmail.com>.
I'm not meaning to push back; just curious as to the drawbacks of PDFs
versus wikis.  I was surprised to see PDF-backed design docs described
as "very bad" when I've seen this approach work well in multiple
projects (e.g. HDFS-265, MAPREDUCE-326, HBASE-3857, ZOOKEEPER-1016,
BOOKKEEPER-11, HDFS-1073, HIVE-1555).  Posting PDFs and the
change-driver incorporating feedback until consensus emerges seems
like a more natural counterpart to our SOP of posting patches and the
coder incorporating feedback until +1 is given.
-jg


On Wed, Jul 20, 2011 at 10:41 AM, Alan D. Cabrera <li...@toolazydogs.com> wrote:
> Yeah, that's the point of a wiki, sharing.  One person should not own a design doc.  Frankly, I don't understand the push back for such a simple document.
>
>
> Regards,
> Alan
>
>
> On Jul 20, 2011, at 10:34 AM, Jakob Homan wrote:
>
>> Doesn't need to be, but could be.  It's usually up to the person
>> proposing the change/driving the discussion to create new versions of
>> the PDF.  In my experience, when people attached the Word doc, others
>> would complain that they didn't have Word, and when people attached,
>> e.g., the laTex document people would have complain they didn't know
>> ancient Egyptian...
>> -jg
>>
>> On Wed, Jul 20, 2011 at 10:19 AM, Alan D. Cabrera <li...@toolazydogs.com> wrote:
>>> Are you saying that the source document for the PDF is also attached to the issue?  I don't see it in KAFKA-50.
>>>
>>>
>>> Regards,
>>> Alan
>>>
>>> On Jul 20, 2011, at 10:11 AM, Jakob Homan wrote:
>>>
>>>> I don't have anything against wikis - they're great for information
>>>> that changes more frequently than releases are made and should be
>>>> user-facing (configuration, FAQs, etc).
>>>>
>>>> For large technical changes, like the one currently being propsosed,
>>>> the PDF isn't static, but will have several versions posted.  The
>>>> whole discussion is: PDF version 0, then comments on that PDF, then
>>>> PDFv1, then more discussions until eventually the discussion turns
>>>> into +1s and the final version of the PDF is attached.  The JIRA does
>>>> a good job of chronicling the discussion that wiki change logs
>>>> doesn't.  JIRA just seems like a more natural forum to spur
>>>> discussion.
>>>>
>>>> Also, having the person driving the change updating the document tends
>>>> to keep the discussion on track and making progress.
>>>>
>>>> Finally, new or less senior members of the community may be reluctant
>>>> to edit a semi-official project document like a wiki, but hopefully
>>>> will be willing to join in the discussion on JIRA.
>>>> -jg
>>>>
>>>>
>>>> On Wed, Jul 20, 2011 at 9:56 AM, Alan D. Cabrera <li...@toolazydogs.com> wrote:
>>>>>
>>>>> On Jul 20, 2011, at 9:51 AM, Jakob Homan wrote:
>>>>>
>>>>>>>> and then just comment and iterate over there. Is that not the preferred way?
>>>>>>>
>>>>>>> No, that's very bad.  There's no way that others can participate and modify the design.
>>>>>>>
>>>>>>
>>>>>> How so?  The documentation is online and the discussion is online and
>>>>>> recorded for posterity. The only barrier to entry to the discussion is
>>>>>> setting up a JIRA account.
>>>>>
>>>>> The design document should be open to the community to edit.  Not a frozen PDF document.  I'll turn the question around.  What problem do you see storing the document in a wiki format?
>>>>>
>>>>>
>>>>> Regards,
>>>>> Alan
>>>>>
>>>>>
>>>
>>>
>
>

Re: Kafka questions

Posted by "Alan D. Cabrera" <li...@toolazydogs.com>.
Yeah, that's the point of a wiki, sharing.  One person should not own a design doc.  Frankly, I don't understand the push back for such a simple document.


Regards,
Alan


On Jul 20, 2011, at 10:34 AM, Jakob Homan wrote:

> Doesn't need to be, but could be.  It's usually up to the person
> proposing the change/driving the discussion to create new versions of
> the PDF.  In my experience, when people attached the Word doc, others
> would complain that they didn't have Word, and when people attached,
> e.g., the laTex document people would have complain they didn't know
> ancient Egyptian...
> -jg
> 
> On Wed, Jul 20, 2011 at 10:19 AM, Alan D. Cabrera <li...@toolazydogs.com> wrote:
>> Are you saying that the source document for the PDF is also attached to the issue?  I don't see it in KAFKA-50.
>> 
>> 
>> Regards,
>> Alan
>> 
>> On Jul 20, 2011, at 10:11 AM, Jakob Homan wrote:
>> 
>>> I don't have anything against wikis - they're great for information
>>> that changes more frequently than releases are made and should be
>>> user-facing (configuration, FAQs, etc).
>>> 
>>> For large technical changes, like the one currently being propsosed,
>>> the PDF isn't static, but will have several versions posted.  The
>>> whole discussion is: PDF version 0, then comments on that PDF, then
>>> PDFv1, then more discussions until eventually the discussion turns
>>> into +1s and the final version of the PDF is attached.  The JIRA does
>>> a good job of chronicling the discussion that wiki change logs
>>> doesn't.  JIRA just seems like a more natural forum to spur
>>> discussion.
>>> 
>>> Also, having the person driving the change updating the document tends
>>> to keep the discussion on track and making progress.
>>> 
>>> Finally, new or less senior members of the community may be reluctant
>>> to edit a semi-official project document like a wiki, but hopefully
>>> will be willing to join in the discussion on JIRA.
>>> -jg
>>> 
>>> 
>>> On Wed, Jul 20, 2011 at 9:56 AM, Alan D. Cabrera <li...@toolazydogs.com> wrote:
>>>> 
>>>> On Jul 20, 2011, at 9:51 AM, Jakob Homan wrote:
>>>> 
>>>>>>> and then just comment and iterate over there. Is that not the preferred way?
>>>>>> 
>>>>>> No, that's very bad.  There's no way that others can participate and modify the design.
>>>>>> 
>>>>> 
>>>>> How so?  The documentation is online and the discussion is online and
>>>>> recorded for posterity. The only barrier to entry to the discussion is
>>>>> setting up a JIRA account.
>>>> 
>>>> The design document should be open to the community to edit.  Not a frozen PDF document.  I'll turn the question around.  What problem do you see storing the document in a wiki format?
>>>> 
>>>> 
>>>> Regards,
>>>> Alan
>>>> 
>>>> 
>> 
>> 


Re: Kafka questions

Posted by Jakob Homan <jg...@gmail.com>.
Doesn't need to be, but could be.  It's usually up to the person
proposing the change/driving the discussion to create new versions of
the PDF.  In my experience, when people attached the Word doc, others
would complain that they didn't have Word, and when people attached,
e.g., the laTex document people would have complain they didn't know
ancient Egyptian...
-jg

On Wed, Jul 20, 2011 at 10:19 AM, Alan D. Cabrera <li...@toolazydogs.com> wrote:
> Are you saying that the source document for the PDF is also attached to the issue?  I don't see it in KAFKA-50.
>
>
> Regards,
> Alan
>
> On Jul 20, 2011, at 10:11 AM, Jakob Homan wrote:
>
>> I don't have anything against wikis - they're great for information
>> that changes more frequently than releases are made and should be
>> user-facing (configuration, FAQs, etc).
>>
>> For large technical changes, like the one currently being propsosed,
>> the PDF isn't static, but will have several versions posted.  The
>> whole discussion is: PDF version 0, then comments on that PDF, then
>> PDFv1, then more discussions until eventually the discussion turns
>> into +1s and the final version of the PDF is attached.  The JIRA does
>> a good job of chronicling the discussion that wiki change logs
>> doesn't.  JIRA just seems like a more natural forum to spur
>> discussion.
>>
>> Also, having the person driving the change updating the document tends
>> to keep the discussion on track and making progress.
>>
>> Finally, new or less senior members of the community may be reluctant
>> to edit a semi-official project document like a wiki, but hopefully
>> will be willing to join in the discussion on JIRA.
>> -jg
>>
>>
>> On Wed, Jul 20, 2011 at 9:56 AM, Alan D. Cabrera <li...@toolazydogs.com> wrote:
>>>
>>> On Jul 20, 2011, at 9:51 AM, Jakob Homan wrote:
>>>
>>>>>> and then just comment and iterate over there. Is that not the preferred way?
>>>>>
>>>>> No, that's very bad.  There's no way that others can participate and modify the design.
>>>>>
>>>>
>>>> How so?  The documentation is online and the discussion is online and
>>>> recorded for posterity. The only barrier to entry to the discussion is
>>>> setting up a JIRA account.
>>>
>>> The design document should be open to the community to edit.  Not a frozen PDF document.  I'll turn the question around.  What problem do you see storing the document in a wiki format?
>>>
>>>
>>> Regards,
>>> Alan
>>>
>>>
>
>

Re: Kafka questions

Posted by "Alan D. Cabrera" <li...@toolazydogs.com>.
Are you saying that the source document for the PDF is also attached to the issue?  I don't see it in KAFKA-50.


Regards,
Alan

On Jul 20, 2011, at 10:11 AM, Jakob Homan wrote:

> I don't have anything against wikis - they're great for information
> that changes more frequently than releases are made and should be
> user-facing (configuration, FAQs, etc).
> 
> For large technical changes, like the one currently being propsosed,
> the PDF isn't static, but will have several versions posted.  The
> whole discussion is: PDF version 0, then comments on that PDF, then
> PDFv1, then more discussions until eventually the discussion turns
> into +1s and the final version of the PDF is attached.  The JIRA does
> a good job of chronicling the discussion that wiki change logs
> doesn't.  JIRA just seems like a more natural forum to spur
> discussion.
> 
> Also, having the person driving the change updating the document tends
> to keep the discussion on track and making progress.
> 
> Finally, new or less senior members of the community may be reluctant
> to edit a semi-official project document like a wiki, but hopefully
> will be willing to join in the discussion on JIRA.
> -jg
> 
> 
> On Wed, Jul 20, 2011 at 9:56 AM, Alan D. Cabrera <li...@toolazydogs.com> wrote:
>> 
>> On Jul 20, 2011, at 9:51 AM, Jakob Homan wrote:
>> 
>>>>> and then just comment and iterate over there. Is that not the preferred way?
>>>> 
>>>> No, that's very bad.  There's no way that others can participate and modify the design.
>>>> 
>>> 
>>> How so?  The documentation is online and the discussion is online and
>>> recorded for posterity. The only barrier to entry to the discussion is
>>> setting up a JIRA account.
>> 
>> The design document should be open to the community to edit.  Not a frozen PDF document.  I'll turn the question around.  What problem do you see storing the document in a wiki format?
>> 
>> 
>> Regards,
>> Alan
>> 
>> 


Re: Kafka questions

Posted by Jakob Homan <jg...@gmail.com>.
I don't have anything against wikis - they're great for information
that changes more frequently than releases are made and should be
user-facing (configuration, FAQs, etc).

For large technical changes, like the one currently being propsosed,
the PDF isn't static, but will have several versions posted.  The
whole discussion is: PDF version 0, then comments on that PDF, then
PDFv1, then more discussions until eventually the discussion turns
into +1s and the final version of the PDF is attached.  The JIRA does
a good job of chronicling the discussion that wiki change logs
doesn't.  JIRA just seems like a more natural forum to spur
discussion.

Also, having the person driving the change updating the document tends
to keep the discussion on track and making progress.

Finally, new or less senior members of the community may be reluctant
to edit a semi-official project document like a wiki, but hopefully
will be willing to join in the discussion on JIRA.
-jg


On Wed, Jul 20, 2011 at 9:56 AM, Alan D. Cabrera <li...@toolazydogs.com> wrote:
>
> On Jul 20, 2011, at 9:51 AM, Jakob Homan wrote:
>
>>>> and then just comment and iterate over there. Is that not the preferred way?
>>>
>>> No, that's very bad.  There's no way that others can participate and modify the design.
>>>
>>
>> How so?  The documentation is online and the discussion is online and
>> recorded for posterity. The only barrier to entry to the discussion is
>> setting up a JIRA account.
>
> The design document should be open to the community to edit.  Not a frozen PDF document.  I'll turn the question around.  What problem do you see storing the document in a wiki format?
>
>
> Regards,
> Alan
>
>

Re: Kafka questions

Posted by "Alan D. Cabrera" <li...@toolazydogs.com>.
On Jul 20, 2011, at 9:51 AM, Jakob Homan wrote:

>>> and then just comment and iterate over there. Is that not the preferred way?
>> 
>> No, that's very bad.  There's no way that others can participate and modify the design.
>> 
> 
> How so?  The documentation is online and the discussion is online and
> recorded for posterity. The only barrier to entry to the discussion is
> setting up a JIRA account.

The design document should be open to the community to edit.  Not a frozen PDF document.  I'll turn the question around.  What problem do you see storing the document in a wiki format?


Regards,
Alan


Re: Kafka questions

Posted by Jakob Homan <jg...@gmail.com>.
>> and then just comment and iterate over there. Is that not the preferred way?
>
> No, that's very bad.  There's no way that others can participate and modify the design.
>

How so?  The documentation is online and the discussion is online and
recorded for posterity. The only barrier to entry to the discussion is
setting up a JIRA account.

Re: Kafka questions

Posted by "Alan D. Cabrera" <li...@toolazydogs.com>.
On Jul 20, 2011, at 9:35 AM, Jun Rao wrote:

> Alan,
> 
> I saw other apache projects just post design docs as attachments in the jira
> and then just comment and iterate over there. Is that not the preferred way?

No, that's very bad.  There's no way that others can participate and modify the design.

> Also, I added a couple of kafka paper/presentations in kafka wiki.
> https://cwiki.apache.org/confluence/display/KAFKA/Index
> 
> Jun
> 
> On Wed, Jul 20, 2011 at 9:15 AM, Alan D. Cabrera <li...@toolazydogs.com>wrote:
> 
>> It would be good to move the content of the PDF files to the wiki so that
>> the community can participate in the design.  These PDF files need to be
>> removed.
>> 
>> 
>> Regards,
>> Alan
>> 
>> On Jul 20, 2011, at 9:07 AM, Jun Rao wrote:
>> 
>>> Oliver,
>>> 
>>> We have a design for replication (see the design doc and subtasks at
>>> https://issues.apache.org/jira/browse/KAFKA-50). We are currently
>> wrapping
>>> up the compression support and will start working on replication soon.
>>> 
>>> Jun
>>> 
>>> On Tue, Jul 19, 2011 at 12:59 PM, Olivier Pomel <ol...@datadoghq.com>
>> wrote:
>>> 
>>>> Thanks, guys, this was a great thread. May be worth pointing to it in
>> the
>>>> online docs as it asks and answers a lot of interesting questions about
>> the
>>>> performance characteristics and tradeoffs made in Kafka.
>>>> 
>>>> How far out do you think built-in replication is?
>>>> Best,
>>>> O.
>>>> 
>>>> 
>>>> 
>>>> On Tue, Jul 19, 2011 at 3:23 PM, Jay Kreps <ja...@gmail.com> wrote:
>>>> 
>>>>> Agreed, no reason the policy to hand out messages should not be
>>>>> configurable. We were hoping to make the whole question irrelevant with
>>>> the
>>>>> replication since then the producer can choose the replication level it
>>>>> wants and fsync durability should be less of a concern.
>>>>> 
>>>>> I agree with your comment that a good implementation of streaming with
>>>> acks
>>>>> being potentially superior.
>>>>> 
>>>>> -Jay
>>>>> 
>>>>> On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <psutter@quantbench.com
>>>>>> wrote:
>>>>> 
>>>>>> Jay,
>>>>>> 
>>>>>> Ah - thanks for the clarification on the delay in the broker. It would
>>>> be
>>>>>> nice to if that were a configuration option, so that the end user can
>>>>>> choose
>>>>>> only to forward messages that have been written to disk, or choose to
>>>>> have
>>>>>> the data forwarded immediately. When you implement replication data
>>>>> hitting
>>>>>> the disk will matter less.
>>>>>> 
>>>>>> On the delay in the producer, I think it could best be resolved
>> through
>>>>>> measurement. In your paper you compare two different approaches, and
>>>> I'm
>>>>>> proposing a third:
>>>>>> 
>>>>>> 1. Send and wait (single message, JMS style)
>>>>>> 2. Batch, send, and wait (Kafka today)
>>>>>> 3. Stream with ACKs
>>>>>> 
>>>>>> Removing any wait for a reply should increase throughput, not decrease
>>>>> it,
>>>>>> so you're likely trading latency against potential CPU efficiency. And
>>>>> the
>>>>>> CPU savings is a question best resolved by measurement.
>>>>>> 
>>>>>> I'd also encourage you to think about the WAN case. When you
>>>>> send-and-wait,
>>>>>> you need to send a buffer that is >> the bandwidth delay product to
>>>>>> approach
>>>>>> full line utilization, and the line will go idle for one RTT while you
>>>>> stop
>>>>>> to wait for a reply. The bandwidth*delay product can get large (10s of
>>>>>> megabytes), and end users will rarely understand the need to tune the
>>>>> batch
>>>>>> size to increase throughput. They'll just say it's slow over long
>>>>>> distances.
>>>>>> 
>>>>>> All that said - your use case doesn't require minimizing latency or
>> WAN
>>>>>> use,
>>>>>> so I can really understand if this isn't a priority for you.
>>>>>> 
>>>>>> It's a well designed product that has had some real thought put into
>>>> it.
>>>>>> It's a really promising system, thanks for taking the time to respond
>>>> to
>>>>> my
>>>>>> comments.
>>>>>> 
>>>>>> Paul
>>>>>> 
>>>>>> On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <ja...@gmail.com>
>>>> wrote:
>>>>>> 
>>>>>>> Ah, I think what you are describing in zeromq is essentially the
>>>>>> equivalent
>>>>>>> of group commit for the socket. Essentially you wait until the socket
>>>>> is
>>>>>> no
>>>>>>> longer writable and then begin to queue data. This is an interesting
>>>>>> idea.
>>>>>>> Of course it would only have a positive effect when you had already
>>>>>>> overflowed the socket buffer and were sending a very high throughput
>>>> of
>>>>>>> small messages. That basically is a way to degrade an overloaded
>>>>>>> synchronous
>>>>>>> send into a batched send. This is not really the same as what we have
>>>>>> done,
>>>>>>> which is to allow the ability to trade off latency for throughput in
>>>> a
>>>>>>> configurable manner. The reason the later is important is that we do
>>>>> not
>>>>>>> have a handful of producers sending at a rate that saturates the
>>>>> network
>>>>>>> I/O
>>>>>>> capacity of those servers (the case where the group commit would
>>>> help)
>>>>>> but
>>>>>>> rather we have thousands of producers sending at a medium low volume,
>>>>> so
>>>>>> we
>>>>>>> would never hit that in our use case. The advantage of batching is
>>>>> fewer
>>>>>>> requests that hit the server, and larger packets. Where the group
>>>>> commit
>>>>>>> would help is for the synchronous producer benchmarks, where you
>>>> could
>>>>>>> potentially get much better throughput. This is something we should
>>>>>>> consider
>>>>>>> adding.
>>>>>>> 
>>>>>>> To be clear, though, we have not added latency in our layer, just
>>>> made
>>>>> a
>>>>>>> configurable way to trade-off latency for throughput. This is
>>>>>> unambiguously
>>>>>>> a good thing, I think.
>>>>>>> 
>>>>>>> With respect to mmap, i think you are misunderstanding where the
>>>>> latency
>>>>>>> comes from. We immediately write data to the filesystem with no delay
>>>>>>> whatsoever. This incurs the overhead of a system call, as you point
>>>>> out,
>>>>>>> which could be avoided by mmap, but that doesn't add much in the way
>>>> of
>>>>>>> latency. The latency comes from the fact that we do not make the
>>>>> written
>>>>>>> data available to consumers until we fsync the file to "ensure" the
>>>>>>> durability of consumed messages. The frequency of the fsync is
>>>>>>> configurable,
>>>>>>> anything either immediate or with a time or # messages threshold.
>>>> This
>>>>>>> again
>>>>>>> trades latency for throughput.
>>>>>>> 
>>>>>>> -Jay
>>>>>>> 
>>>>>>> On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <
>>>> psutter@quantbench.com
>>>>>>>> wrote:
>>>>>>> 
>>>>>>>> *Producer latency* - I'm not familiar with zeromq internals but my
>>>>>>>> understanding is that they send the first message(s) immediately
>>>> and
>>>>> as
>>>>>>> TCP
>>>>>>>> queues up the data, it will eventually block as the send buffer
>>>>> fills,
>>>>>>> and
>>>>>>>> during this time messages can queue up, and thte net-net is that on
>>>>>>> average
>>>>>>>> the number of system calls is << the number of messages. The key is
>>>>>>> having
>>>>>>>> a
>>>>>>>> separate thread for network operations with very efficient thread
>>>>>>>> coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is
>>>> a
>>>>>>> blight
>>>>>>>> against humanity.
>>>>>>>> 
>>>>>>>> Having any sort of delay adds latency. If every developer thinks
>>>> its
>>>>> OK
>>>>>>> to
>>>>>>>> add a little latency in his layer, pretty soon you end up with 10
>>>>>> second
>>>>>>>> end
>>>>>>>> to end latency.
>>>>>>>> 
>>>>>>>> Having an "accumulated message count" is also bad for WAN
>>>>> performance.
>>>>>> If
>>>>>>>> your "window size" is a set of delayed messages, the only way to
>>>> deal
>>>>>>> with
>>>>>>>> a
>>>>>>>> large bandwidth*delay product is to delay a lot of messages, then
>>>>> send
>>>>>>>> them.
>>>>>>>> You can fit a lot of data into a fiber. Imagine a gigabit link with
>>>>>> 100ms
>>>>>>>> roundtrip time, you can store 100MB in the fiber. And you need a
>>>>>>> multiples
>>>>>>>> of that for buffering if you need to do a retransmit.
>>>>>>>> 
>>>>>>>> *Broker Latency *- With mmap the memcpy() of the message should
>>>> make
>>>>>> the
>>>>>>>> data available to a thread even in another process, the pages that
>>>>> you
>>>>>>> have
>>>>>>>> mapped are also in the buffer cache and available to a sendfile()
>>>>> call.
>>>>>>> or
>>>>>>>> at least I think so. The flush to physical disk (or msync() in this
>>>>>> case)
>>>>>>>> would still be delayed but without impacting end to end latency.
>>>>>>>> 
>>>>>>>> That said, in benchmarks I have done the fastest IO with the lowest
>>>>> CPU
>>>>>>>> overhead is unbuffered (direct) IO (which is lower overhead than
>>>>> using
>>>>>>> the
>>>>>>>> buffer cache with or without memory mapping), but then you'd have
>>>> to
>>>>>>> manage
>>>>>>>> your own buffer pool and run your broker in a single multithreaded
>>>>>>> process.
>>>>>>>> But thats getting more extreme. Just getting rid of this buffer
>>>> write
>>>>>>> delay
>>>>>>>> by using memory mapping will remove a big chunk of latency.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <ja...@gmail.com>
>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Paul,
>>>>>>>>> 
>>>>>>>>> We are definitely interested in lowering latency--lower is always
>>>>>>>>> better--but that was not a major concern for us so far (we were
>>>>>>> replacing
>>>>>>>> a
>>>>>>>>> system with 1 hour latency), so we haven't focused on it yet. As
>>>>> you
>>>>>>>>> describe latency in our setup at linkedin comes from batching on
>>>>> the
>>>>>>>>> frontend and batching on the kafka servers do to very lenient
>>>> flush
>>>>>>>>> settings.
>>>>>>>>> 
>>>>>>>>> I am interested in your comments on zeromq. Do they actually have
>>>> a
>>>>>>>> better
>>>>>>>>> approach for this problem even when using TCP? If so I would be
>>>>>>>> interested
>>>>>>>>> to understand. The way I see things this is about trading
>>>>> throughput
>>>>>>> and
>>>>>>>>> latency. On the producer side you have only a few options:
>>>>>> immediately
>>>>>>>>> write
>>>>>>>>> the data to the socket buffer for sending or wait and see if the
>>>>>>>>> application
>>>>>>>>> writes more data. The OS will do this for you unless you set
>>>>>>> TCP_NODELAY,
>>>>>>>>> but the OS is relatively inflexible, it doesn't understand your
>>>>> data
>>>>>> so
>>>>>>> I
>>>>>>>>> think it just waits 200ms or until the socket buffer is full.
>>>>>>>>> 
>>>>>>>>> The current approach in the async producer captures the same
>>>>>> tradeoff,
>>>>>>>> but
>>>>>>>>> a
>>>>>>>>> little more flexibly, it allows you to specify a max delay and
>>>> max
>>>>>>>>> accumulated message count, data is written when either of those
>>>> is
>>>>>> hit.
>>>>>>>>> 
>>>>>>>>> Is it possible to better capture this tradeoff? Basically I am
>>>> not
>>>>>>> aware
>>>>>>>> of
>>>>>>>>> any other trick here if you are using TCP, so i would be
>>>> interested
>>>>>> in
>>>>>>>> what
>>>>>>>>> zeromq does if they are doing this better.
>>>>>>>>> 
>>>>>>>>> We do indeed write each message set to the filesystem as it
>>>> arrives
>>>>>> but
>>>>>>>> we
>>>>>>>>> distribute messages to consumers only after the write has been
>>>>>> flushed
>>>>>>> to
>>>>>>>>> disk, delaying (batching) that flush is the cause of the latency
>>>>> but
>>>>>>> also
>>>>>>>>> gives better use of IOPs by generating larger writes. Mmap would
>>>>>> remove
>>>>>>>> the
>>>>>>>>> system call (which would be good), but not the flush I think. As
>>>>> you
>>>>>>> say,
>>>>>>>>> adding replication allows giving stronger guarantees without
>>>>> actually
>>>>>>>>> caring
>>>>>>>>> about durability on a particular server which would make it
>>>>> possible
>>>>>> to
>>>>>>>>> distribute messages to consumers after ack from some number of
>>>>> other
>>>>>>>>> servers
>>>>>>>>> irrespective of flushing to disk.
>>>>>>>>> 
>>>>>>>>> -Jay
>>>>>>>>> 
>>>>>>>>> On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <
>>>>> psutter@quantbench.com
>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Jun
>>>>>>>>>> 
>>>>>>>>>> Thanks for your answers and the link to the paper - that helps
>>>> a
>>>>>> lot,
>>>>>>>>>> especially the comment in the paper that 10 second end to end
>>>>>> latency
>>>>>>>> is
>>>>>>>>>> good enough for your intended use case.
>>>>>>>>>> 
>>>>>>>>>> We're looking for much lower latencies, and the basic design of
>>>>>> Kafka
>>>>>>>>> feels
>>>>>>>>>> like it should support latencies in milliseconds with a few
>>>>>> changes.
>>>>>>>>> We're
>>>>>>>>>> either going to build our own system, or help develop something
>>>>>> that
>>>>>>>>>> already
>>>>>>>>>> exists, so please take my comments in the constructive way
>>>>> they're
>>>>>>>>> intended
>>>>>>>>>> (I realize the changes I'm suggesting are outside your intended
>>>>> use
>>>>>>>> case,
>>>>>>>>>> but if you're interested we may be able to provide a very
>>>> capable
>>>>>>>>> developer
>>>>>>>>>> to help with the work, assuming we choose kafka over the other
>>>>>>> zillion
>>>>>>>>>> streaming systems that are coming out of the woodwork).
>>>>>>>>>> 
>>>>>>>>>> a. *Producer "queue.time"* - In my question 4 below, I was
>>>>>> referring
>>>>>>> to
>>>>>>>>> the
>>>>>>>>>> producer queue time.  With a default value of 5 seconds, that
>>>>>>> accounts
>>>>>>>>> for
>>>>>>>>>> half your end to end latency. A system like zeromq is optimized
>>>>> to
>>>>>>>> write
>>>>>>>>>> data immediately without delay, but in such a way to minimizes
>>>>> the
>>>>>>>> number
>>>>>>>>>> of
>>>>>>>>>> system calls required during high throughput messages. Zeromq
>>>> is
>>>>> no
>>>>>>>>>> nirvana,
>>>>>>>>>> but it has a number of nice properties.
>>>>>>>>>> 
>>>>>>>>>> b. *Broker "log.default.flush.interval.ms"* - The default
>>>> value
>>>>> of
>>>>>> 3
>>>>>>>>>> seconds
>>>>>>>>>> appears to be another significant source of latency in the
>>>>> system,
>>>>>>>>> assuming
>>>>>>>>>> that clients are unable to access data until it has been
>>>> flushed.
>>>>>>> Since
>>>>>>>>> you
>>>>>>>>>> have wisely chosen to take advantage of the buffer cache as
>>>> part
>>>>> of
>>>>>>>> your
>>>>>>>>>> system design, it seems that you could remove this latency
>>>>>> completely
>>>>>>>> by
>>>>>>>>>> memory mapping the partitions and memcpying each message as it
>>>>>>> arrives.
>>>>>>>>>> With
>>>>>>>>>> the right IPC mechanism clients could have immediate access to
>>>>> new
>>>>>>>>>> messages.
>>>>>>>>>> 
>>>>>>>>>> c. *Batching, sync vs async, replication, and auditing*. Its
>>>>>>>>> understandable
>>>>>>>>>> that you've chosen a a forensic approach to producer
>>>> reliability
>>>>>>> (after
>>>>>>>>> the
>>>>>>>>>> fact auditing), but when you implement replication it would be
>>>>>> really
>>>>>>>>> nice
>>>>>>>>>> to revise the producer protocol mechanisms. If you used a
>>>>> streaming
>>>>>>>>>> mechanism with producer offsets and ACKs, you could ensure
>>>>> reliable
>>>>>>>>>> delivery
>>>>>>>>>> of producer streams to multiple brokers without the need to
>>>>> choose
>>>>>> a
>>>>>>>>> "batch
>>>>>>>>>> size" or "queue.time". This could also give you active/active
>>>>>>> failover
>>>>>>>> of
>>>>>>>>>> brokers. This may also help in the WAN case (my question 3
>>>> below)
>>>>>>>> because
>>>>>>>>>> you will be able to adaptively stuff more and more data through
>>>>> the
>>>>>>>> fiber
>>>>>>>>>> for high bandwidth*delay links without having to choose a large
>>>>>>> "batch
>>>>>>>>>> size"
>>>>>>>>>> nor have the additional latency that entails. Oh, and it will
>>>>> help
>>>>>>> you
>>>>>>>>> deal
>>>>>>>>>> with CRC errors once you start checking for them.
>>>>>>>>>> 
>>>>>>>>>> c. *Performance measurements* - I'd like to make a suggestion
>>>> for
>>>>>>> your
>>>>>>>>>> performance measurements. Your benchmarks measure throughput,
>>>> but
>>>>> a
>>>>>>>>>> throughput number is meaningless without an associated "% cpu
>>>>>> time".
>>>>>>>>>> Ideally
>>>>>>>>>> all measurements achieve wire speed (100MB/sec) at 0% CPU
>>>> (since,
>>>>>>> after
>>>>>>>>>> all,
>>>>>>>>>> this is plumbing and we assume the cores in the system should
>>>>> have
>>>>>>>>> capacity
>>>>>>>>>> set aside for useful work too). Obviously nobody ever achieves
>>>>>> this,
>>>>>>>> but
>>>>>>>>> by
>>>>>>>>>> measuring it one can raise the bar in terms of optimization.
>>>>>>>>>> 
>>>>>>>>>> Paul
>>>>>>>>>> 
>>>>>>>>>> ps. Just for background, I am the cofounder at Quantcast where
>>>> we
>>>>>>>> process
>>>>>>>>>> 3.5PB of data per day. These questions are related to my new
>>>>>> startup
>>>>>>>>>> Quantbench which will deal with financial market data where you
>>>>>> dont
>>>>>>>> want
>>>>>>>>>> any latency at all. And WAN issues are a big deal too.
>>>>>> Incidentally,
>>>>>>> I
>>>>>>>>> was
>>>>>>>>>> also founder of Orbital Data which was a WAN optimization
>>>> company
>>>>>> so
>>>>>>>> I've
>>>>>>>>>> done a lot of work with protocols over long distances.
>>>>>>>>>> 
>>>>>>>>>> On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com>
>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Paul,
>>>>>>>>>>> 
>>>>>>>>>>> Excellent questions. See my answers below. Thanks,
>>>>>>>>>>> 
>>>>>>>>>>> On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <
>>>>>>> psutter@quantbench.com
>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Kafka looks like an exciting project, thanks for opening it
>>>>> up.
>>>>>>>>>>>> 
>>>>>>>>>>>> I have a few questions:
>>>>>>>>>>>> 
>>>>>>>>>>>> 1. Are checksums end to end (ie, created by the producer
>>>> and
>>>>>>>> checked
>>>>>>>>> by
>>>>>>>>>>> the
>>>>>>>>>>>> consumer)? or are they only used to confirm buffercache
>>>>>> behavior
>>>>>>> on
>>>>>>>>>> disk
>>>>>>>>>>> as
>>>>>>>>>>>> mentioned in the documentation? Bit errors occur vastly
>>>> more
>>>>>>> often
>>>>>>>>> than
>>>>>>>>>>>> most
>>>>>>>>>>>> people assume, often because of device driver bugs. TCP
>>>> only
>>>>>>>> detects
>>>>>>>>> 1
>>>>>>>>>>>> error
>>>>>>>>>>>> in 65536, so errors can flow through (if you like I can
>>>> send
>>>>>>> links
>>>>>>>> to
>>>>>>>>>>>> papers
>>>>>>>>>>>> describing the need for checksums everywhere).
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Checksum is generated at the producer and propagated to the
>>>>>> broker
>>>>>>>> and
>>>>>>>>>>> eventually the consumer. Currently, we only validate the
>>>>> checksum
>>>>>>> at
>>>>>>>>> the
>>>>>>>>>>> broker. We could further validate it at the consumer in the
>>>>>> future.
>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 2. The consumer has a pretty solid mechanism to ensure it
>>>>> hasnt
>>>>>>>>> missed
>>>>>>>>>>> any
>>>>>>>>>>>> messages (i like the design by the way), but how does the
>>>>>>> producer
>>>>>>>>> know
>>>>>>>>>>>> that
>>>>>>>>>>>> all of its messages have been stored? (no apparent message
>>>> id
>>>>>> on
>>>>>>>> that
>>>>>>>>>>> side
>>>>>>>>>>>> since the message id isnt known until the message is
>>>> written
>>>>> to
>>>>>>> the
>>>>>>>>>>> file).
>>>>>>>>>>>> I'm especially curious how failover/replication could be
>>>>>>>> implemented
>>>>>>>>>> and
>>>>>>>>>>>> I'm
>>>>>>>>>>>> thinking that acks on the publisher side may help)
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> The producer side auditing is not built-in. At LinkedIn, we
>>>> do
>>>>>> that
>>>>>>>> by
>>>>>>>>>>> generating an auditing event periodically in the eventhandler
>>>>> of
>>>>>>> the
>>>>>>>>>> async
>>>>>>>>>>> producer. The auditing event contains the number of events
>>>>>> produced
>>>>>>>> in
>>>>>>>>> a
>>>>>>>>>>> configured window (e.g., 10 minutes) and are sent to a
>>>> separate
>>>>>>>> topic.
>>>>>>>>>> The
>>>>>>>>>>> consumer can read the actual data and the auditing event and
>>>>>>> compare
>>>>>>>>> the
>>>>>>>>>>> counts. See our paper (
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
>>>>>>>>>>> )
>>>>>>>>>>> for some more details.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 3. Has the consumer's flow control been tested over high
>>>>>>>>>> bandwidth*delay
>>>>>>>>>>>> links? (what bandwidth can you get from a London consumer
>>>> of
>>>>> an
>>>>>>> SF
>>>>>>>>>>>> cluster?)
>>>>>>>>>>>> 
>>>>>>>>>>>> Yes, we actually replicate kafka data across data centers,
>>>>>> using
>>>>>>> an
>>>>>>>>>>> embedded consumer in a broker. Again, there is a bit more
>>>> info
>>>>> on
>>>>>>>> this
>>>>>>>>> in
>>>>>>>>>>> our paper.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>> 4. What kind of performance do you get if you set the
>>>>>> producer's
>>>>>>>>>> message
>>>>>>>>>>>> delay to zero? (ie, is there a separate system call for
>>>> each
>>>>>>>> message?
>>>>>>>>>> or
>>>>>>>>>>> do
>>>>>>>>>>>> you manage to aggregate messages into a smaller number of
>>>>>> system
>>>>>>>>> calls
>>>>>>>>>>> even
>>>>>>>>>>>> with a delay of 0?)
>>>>>>>>>>>> 
>>>>>>>>>>>> I assume that you are referring to the flush interval. One
>>>>> can
>>>>>>>>>> configure
>>>>>>>>>>> to
>>>>>>>>>>> flush every message to disk. This will slow down the
>>>> throughput
>>>>>>>>>>> significantly.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>> 5. Have you considered using a library like zeromq for the
>>>>>>>> messaging
>>>>>>>>>>> layer
>>>>>>>>>>>> instead of rolling your own? (zeromq will handle #4 cleanly
>>>>> at
>>>>>>>>> millions
>>>>>>>>>>> of
>>>>>>>>>>>> messages per second and has clients in 20 languages)
>>>>>>>>>>>> 
>>>>>>>>>>>> No. Our proprietary format allows us to support things like
>>>>>>>>> compression
>>>>>>>>>>> in
>>>>>>>>>>> the future. However, we can definitely look into the zeromq
>>>>>> format.
>>>>>>>> Is
>>>>>>>>>>> their
>>>>>>>>>>> messaging layer easily extractable?
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>> 6. Do you have any plans to support intermediate processing
>>>>>>>> elements
>>>>>>>>>> the
>>>>>>>>>>>> way
>>>>>>>>>>>> Flume supports?
>>>>>>>>>>>> 
>>>>>>>>>>>> For now, we are just focusing on getting the raw messaging
>>>>>> layer
>>>>>>>>> solid.
>>>>>>>>>>> We
>>>>>>>>>>> have worked a bit on streaming processing and will look into
>>>>> that
>>>>>>>> again
>>>>>>>>>> in
>>>>>>>>>>> the future.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>> 7. The docs mention that new versions will only be released
>>>>>> after
>>>>>>>>> they
>>>>>>>>>>> are
>>>>>>>>>>>> in production at LinkedIn? Does that mean that the latest
>>>>>> version
>>>>>>>> of
>>>>>>>>>> the
>>>>>>>>>>>> source code is hidden at LinkedIn and contributors would
>>>> have
>>>>>> to
>>>>>>>>> throw
>>>>>>>>>>>> patches over the wall and wait months to get the integrated
>>>>>>>> product?
>>>>>>>>>>>> 
>>>>>>>>>>>> What we ran at LinkedIn is the same version in open source
>>>>> and
>>>>>>>> there
>>>>>>>>> is
>>>>>>>>>>> no
>>>>>>>>>>> internal repository of Kafka at LinkedIn. We plan to maintain
>>>>>> that
>>>>>>> in
>>>>>>>>> the
>>>>>>>>>>> future.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 
>> 


Re: Kafka questions

Posted by Jun Rao <ju...@gmail.com>.
Alan,

I saw other apache projects just post design docs as attachments in the jira
and then just comment and iterate over there. Is that not the preferred way?

Also, I added a couple of kafka paper/presentations in kafka wiki.
https://cwiki.apache.org/confluence/display/KAFKA/Index

Jun

On Wed, Jul 20, 2011 at 9:15 AM, Alan D. Cabrera <li...@toolazydogs.com>wrote:

> It would be good to move the content of the PDF files to the wiki so that
> the community can participate in the design.  These PDF files need to be
> removed.
>
>
> Regards,
> Alan
>
> On Jul 20, 2011, at 9:07 AM, Jun Rao wrote:
>
> > Oliver,
> >
> > We have a design for replication (see the design doc and subtasks at
> > https://issues.apache.org/jira/browse/KAFKA-50). We are currently
> wrapping
> > up the compression support and will start working on replication soon.
> >
> > Jun
> >
> > On Tue, Jul 19, 2011 at 12:59 PM, Olivier Pomel <ol...@datadoghq.com>
> wrote:
> >
> >> Thanks, guys, this was a great thread. May be worth pointing to it in
> the
> >> online docs as it asks and answers a lot of interesting questions about
> the
> >> performance characteristics and tradeoffs made in Kafka.
> >>
> >> How far out do you think built-in replication is?
> >> Best,
> >> O.
> >>
> >>
> >>
> >> On Tue, Jul 19, 2011 at 3:23 PM, Jay Kreps <ja...@gmail.com> wrote:
> >>
> >>> Agreed, no reason the policy to hand out messages should not be
> >>> configurable. We were hoping to make the whole question irrelevant with
> >> the
> >>> replication since then the producer can choose the replication level it
> >>> wants and fsync durability should be less of a concern.
> >>>
> >>> I agree with your comment that a good implementation of streaming with
> >> acks
> >>> being potentially superior.
> >>>
> >>> -Jay
> >>>
> >>> On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <psutter@quantbench.com
> >>>> wrote:
> >>>
> >>>> Jay,
> >>>>
> >>>> Ah - thanks for the clarification on the delay in the broker. It would
> >> be
> >>>> nice to if that were a configuration option, so that the end user can
> >>>> choose
> >>>> only to forward messages that have been written to disk, or choose to
> >>> have
> >>>> the data forwarded immediately. When you implement replication data
> >>> hitting
> >>>> the disk will matter less.
> >>>>
> >>>> On the delay in the producer, I think it could best be resolved
> through
> >>>> measurement. In your paper you compare two different approaches, and
> >> I'm
> >>>> proposing a third:
> >>>>
> >>>> 1. Send and wait (single message, JMS style)
> >>>> 2. Batch, send, and wait (Kafka today)
> >>>> 3. Stream with ACKs
> >>>>
> >>>> Removing any wait for a reply should increase throughput, not decrease
> >>> it,
> >>>> so you're likely trading latency against potential CPU efficiency. And
> >>> the
> >>>> CPU savings is a question best resolved by measurement.
> >>>>
> >>>> I'd also encourage you to think about the WAN case. When you
> >>> send-and-wait,
> >>>> you need to send a buffer that is >> the bandwidth delay product to
> >>>> approach
> >>>> full line utilization, and the line will go idle for one RTT while you
> >>> stop
> >>>> to wait for a reply. The bandwidth*delay product can get large (10s of
> >>>> megabytes), and end users will rarely understand the need to tune the
> >>> batch
> >>>> size to increase throughput. They'll just say it's slow over long
> >>>> distances.
> >>>>
> >>>> All that said - your use case doesn't require minimizing latency or
> WAN
> >>>> use,
> >>>> so I can really understand if this isn't a priority for you.
> >>>>
> >>>> It's a well designed product that has had some real thought put into
> >> it.
> >>>> It's a really promising system, thanks for taking the time to respond
> >> to
> >>> my
> >>>> comments.
> >>>>
> >>>> Paul
> >>>>
> >>>> On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <ja...@gmail.com>
> >> wrote:
> >>>>
> >>>>> Ah, I think what you are describing in zeromq is essentially the
> >>>> equivalent
> >>>>> of group commit for the socket. Essentially you wait until the socket
> >>> is
> >>>> no
> >>>>> longer writable and then begin to queue data. This is an interesting
> >>>> idea.
> >>>>> Of course it would only have a positive effect when you had already
> >>>>> overflowed the socket buffer and were sending a very high throughput
> >> of
> >>>>> small messages. That basically is a way to degrade an overloaded
> >>>>> synchronous
> >>>>> send into a batched send. This is not really the same as what we have
> >>>> done,
> >>>>> which is to allow the ability to trade off latency for throughput in
> >> a
> >>>>> configurable manner. The reason the later is important is that we do
> >>> not
> >>>>> have a handful of producers sending at a rate that saturates the
> >>> network
> >>>>> I/O
> >>>>> capacity of those servers (the case where the group commit would
> >> help)
> >>>> but
> >>>>> rather we have thousands of producers sending at a medium low volume,
> >>> so
> >>>> we
> >>>>> would never hit that in our use case. The advantage of batching is
> >>> fewer
> >>>>> requests that hit the server, and larger packets. Where the group
> >>> commit
> >>>>> would help is for the synchronous producer benchmarks, where you
> >> could
> >>>>> potentially get much better throughput. This is something we should
> >>>>> consider
> >>>>> adding.
> >>>>>
> >>>>> To be clear, though, we have not added latency in our layer, just
> >> made
> >>> a
> >>>>> configurable way to trade-off latency for throughput. This is
> >>>> unambiguously
> >>>>> a good thing, I think.
> >>>>>
> >>>>> With respect to mmap, i think you are misunderstanding where the
> >>> latency
> >>>>> comes from. We immediately write data to the filesystem with no delay
> >>>>> whatsoever. This incurs the overhead of a system call, as you point
> >>> out,
> >>>>> which could be avoided by mmap, but that doesn't add much in the way
> >> of
> >>>>> latency. The latency comes from the fact that we do not make the
> >>> written
> >>>>> data available to consumers until we fsync the file to "ensure" the
> >>>>> durability of consumed messages. The frequency of the fsync is
> >>>>> configurable,
> >>>>> anything either immediate or with a time or # messages threshold.
> >> This
> >>>>> again
> >>>>> trades latency for throughput.
> >>>>>
> >>>>> -Jay
> >>>>>
> >>>>> On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <
> >> psutter@quantbench.com
> >>>>>> wrote:
> >>>>>
> >>>>>> *Producer latency* - I'm not familiar with zeromq internals but my
> >>>>>> understanding is that they send the first message(s) immediately
> >> and
> >>> as
> >>>>> TCP
> >>>>>> queues up the data, it will eventually block as the send buffer
> >>> fills,
> >>>>> and
> >>>>>> during this time messages can queue up, and thte net-net is that on
> >>>>> average
> >>>>>> the number of system calls is << the number of messages. The key is
> >>>>> having
> >>>>>> a
> >>>>>> separate thread for network operations with very efficient thread
> >>>>>> coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is
> >> a
> >>>>> blight
> >>>>>> against humanity.
> >>>>>>
> >>>>>> Having any sort of delay adds latency. If every developer thinks
> >> its
> >>> OK
> >>>>> to
> >>>>>> add a little latency in his layer, pretty soon you end up with 10
> >>>> second
> >>>>>> end
> >>>>>> to end latency.
> >>>>>>
> >>>>>> Having an "accumulated message count" is also bad for WAN
> >>> performance.
> >>>> If
> >>>>>> your "window size" is a set of delayed messages, the only way to
> >> deal
> >>>>> with
> >>>>>> a
> >>>>>> large bandwidth*delay product is to delay a lot of messages, then
> >>> send
> >>>>>> them.
> >>>>>> You can fit a lot of data into a fiber. Imagine a gigabit link with
> >>>> 100ms
> >>>>>> roundtrip time, you can store 100MB in the fiber. And you need a
> >>>>> multiples
> >>>>>> of that for buffering if you need to do a retransmit.
> >>>>>>
> >>>>>> *Broker Latency *- With mmap the memcpy() of the message should
> >> make
> >>>> the
> >>>>>> data available to a thread even in another process, the pages that
> >>> you
> >>>>> have
> >>>>>> mapped are also in the buffer cache and available to a sendfile()
> >>> call.
> >>>>> or
> >>>>>> at least I think so. The flush to physical disk (or msync() in this
> >>>> case)
> >>>>>> would still be delayed but without impacting end to end latency.
> >>>>>>
> >>>>>> That said, in benchmarks I have done the fastest IO with the lowest
> >>> CPU
> >>>>>> overhead is unbuffered (direct) IO (which is lower overhead than
> >>> using
> >>>>> the
> >>>>>> buffer cache with or without memory mapping), but then you'd have
> >> to
> >>>>> manage
> >>>>>> your own buffer pool and run your broker in a single multithreaded
> >>>>> process.
> >>>>>> But thats getting more extreme. Just getting rid of this buffer
> >> write
> >>>>> delay
> >>>>>> by using memory mapping will remove a big chunk of latency.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <ja...@gmail.com>
> >>>> wrote:
> >>>>>>
> >>>>>>> Hi Paul,
> >>>>>>>
> >>>>>>> We are definitely interested in lowering latency--lower is always
> >>>>>>> better--but that was not a major concern for us so far (we were
> >>>>> replacing
> >>>>>> a
> >>>>>>> system with 1 hour latency), so we haven't focused on it yet. As
> >>> you
> >>>>>>> describe latency in our setup at linkedin comes from batching on
> >>> the
> >>>>>>> frontend and batching on the kafka servers do to very lenient
> >> flush
> >>>>>>> settings.
> >>>>>>>
> >>>>>>> I am interested in your comments on zeromq. Do they actually have
> >> a
> >>>>>> better
> >>>>>>> approach for this problem even when using TCP? If so I would be
> >>>>>> interested
> >>>>>>> to understand. The way I see things this is about trading
> >>> throughput
> >>>>> and
> >>>>>>> latency. On the producer side you have only a few options:
> >>>> immediately
> >>>>>>> write
> >>>>>>> the data to the socket buffer for sending or wait and see if the
> >>>>>>> application
> >>>>>>> writes more data. The OS will do this for you unless you set
> >>>>> TCP_NODELAY,
> >>>>>>> but the OS is relatively inflexible, it doesn't understand your
> >>> data
> >>>> so
> >>>>> I
> >>>>>>> think it just waits 200ms or until the socket buffer is full.
> >>>>>>>
> >>>>>>> The current approach in the async producer captures the same
> >>>> tradeoff,
> >>>>>> but
> >>>>>>> a
> >>>>>>> little more flexibly, it allows you to specify a max delay and
> >> max
> >>>>>>> accumulated message count, data is written when either of those
> >> is
> >>>> hit.
> >>>>>>>
> >>>>>>> Is it possible to better capture this tradeoff? Basically I am
> >> not
> >>>>> aware
> >>>>>> of
> >>>>>>> any other trick here if you are using TCP, so i would be
> >> interested
> >>>> in
> >>>>>> what
> >>>>>>> zeromq does if they are doing this better.
> >>>>>>>
> >>>>>>> We do indeed write each message set to the filesystem as it
> >> arrives
> >>>> but
> >>>>>> we
> >>>>>>> distribute messages to consumers only after the write has been
> >>>> flushed
> >>>>> to
> >>>>>>> disk, delaying (batching) that flush is the cause of the latency
> >>> but
> >>>>> also
> >>>>>>> gives better use of IOPs by generating larger writes. Mmap would
> >>>> remove
> >>>>>> the
> >>>>>>> system call (which would be good), but not the flush I think. As
> >>> you
> >>>>> say,
> >>>>>>> adding replication allows giving stronger guarantees without
> >>> actually
> >>>>>>> caring
> >>>>>>> about durability on a particular server which would make it
> >>> possible
> >>>> to
> >>>>>>> distribute messages to consumers after ack from some number of
> >>> other
> >>>>>>> servers
> >>>>>>> irrespective of flushing to disk.
> >>>>>>>
> >>>>>>> -Jay
> >>>>>>>
> >>>>>>> On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <
> >>> psutter@quantbench.com
> >>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Jun
> >>>>>>>>
> >>>>>>>> Thanks for your answers and the link to the paper - that helps
> >> a
> >>>> lot,
> >>>>>>>> especially the comment in the paper that 10 second end to end
> >>>> latency
> >>>>>> is
> >>>>>>>> good enough for your intended use case.
> >>>>>>>>
> >>>>>>>> We're looking for much lower latencies, and the basic design of
> >>>> Kafka
> >>>>>>> feels
> >>>>>>>> like it should support latencies in milliseconds with a few
> >>>> changes.
> >>>>>>> We're
> >>>>>>>> either going to build our own system, or help develop something
> >>>> that
> >>>>>>>> already
> >>>>>>>> exists, so please take my comments in the constructive way
> >>> they're
> >>>>>>> intended
> >>>>>>>> (I realize the changes I'm suggesting are outside your intended
> >>> use
> >>>>>> case,
> >>>>>>>> but if you're interested we may be able to provide a very
> >> capable
> >>>>>>> developer
> >>>>>>>> to help with the work, assuming we choose kafka over the other
> >>>>> zillion
> >>>>>>>> streaming systems that are coming out of the woodwork).
> >>>>>>>>
> >>>>>>>> a. *Producer "queue.time"* - In my question 4 below, I was
> >>>> referring
> >>>>> to
> >>>>>>> the
> >>>>>>>> producer queue time.  With a default value of 5 seconds, that
> >>>>> accounts
> >>>>>>> for
> >>>>>>>> half your end to end latency. A system like zeromq is optimized
> >>> to
> >>>>>> write
> >>>>>>>> data immediately without delay, but in such a way to minimizes
> >>> the
> >>>>>> number
> >>>>>>>> of
> >>>>>>>> system calls required during high throughput messages. Zeromq
> >> is
> >>> no
> >>>>>>>> nirvana,
> >>>>>>>> but it has a number of nice properties.
> >>>>>>>>
> >>>>>>>> b. *Broker "log.default.flush.interval.ms"* - The default
> >> value
> >>> of
> >>>> 3
> >>>>>>>> seconds
> >>>>>>>> appears to be another significant source of latency in the
> >>> system,
> >>>>>>> assuming
> >>>>>>>> that clients are unable to access data until it has been
> >> flushed.
> >>>>> Since
> >>>>>>> you
> >>>>>>>> have wisely chosen to take advantage of the buffer cache as
> >> part
> >>> of
> >>>>>> your
> >>>>>>>> system design, it seems that you could remove this latency
> >>>> completely
> >>>>>> by
> >>>>>>>> memory mapping the partitions and memcpying each message as it
> >>>>> arrives.
> >>>>>>>> With
> >>>>>>>> the right IPC mechanism clients could have immediate access to
> >>> new
> >>>>>>>> messages.
> >>>>>>>>
> >>>>>>>> c. *Batching, sync vs async, replication, and auditing*. Its
> >>>>>>> understandable
> >>>>>>>> that you've chosen a a forensic approach to producer
> >> reliability
> >>>>> (after
> >>>>>>> the
> >>>>>>>> fact auditing), but when you implement replication it would be
> >>>> really
> >>>>>>> nice
> >>>>>>>> to revise the producer protocol mechanisms. If you used a
> >>> streaming
> >>>>>>>> mechanism with producer offsets and ACKs, you could ensure
> >>> reliable
> >>>>>>>> delivery
> >>>>>>>> of producer streams to multiple brokers without the need to
> >>> choose
> >>>> a
> >>>>>>> "batch
> >>>>>>>> size" or "queue.time". This could also give you active/active
> >>>>> failover
> >>>>>> of
> >>>>>>>> brokers. This may also help in the WAN case (my question 3
> >> below)
> >>>>>> because
> >>>>>>>> you will be able to adaptively stuff more and more data through
> >>> the
> >>>>>> fiber
> >>>>>>>> for high bandwidth*delay links without having to choose a large
> >>>>> "batch
> >>>>>>>> size"
> >>>>>>>> nor have the additional latency that entails. Oh, and it will
> >>> help
> >>>>> you
> >>>>>>> deal
> >>>>>>>> with CRC errors once you start checking for them.
> >>>>>>>>
> >>>>>>>> c. *Performance measurements* - I'd like to make a suggestion
> >> for
> >>>>> your
> >>>>>>>> performance measurements. Your benchmarks measure throughput,
> >> but
> >>> a
> >>>>>>>> throughput number is meaningless without an associated "% cpu
> >>>> time".
> >>>>>>>> Ideally
> >>>>>>>> all measurements achieve wire speed (100MB/sec) at 0% CPU
> >> (since,
> >>>>> after
> >>>>>>>> all,
> >>>>>>>> this is plumbing and we assume the cores in the system should
> >>> have
> >>>>>>> capacity
> >>>>>>>> set aside for useful work too). Obviously nobody ever achieves
> >>>> this,
> >>>>>> but
> >>>>>>> by
> >>>>>>>> measuring it one can raise the bar in terms of optimization.
> >>>>>>>>
> >>>>>>>> Paul
> >>>>>>>>
> >>>>>>>> ps. Just for background, I am the cofounder at Quantcast where
> >> we
> >>>>>> process
> >>>>>>>> 3.5PB of data per day. These questions are related to my new
> >>>> startup
> >>>>>>>> Quantbench which will deal with financial market data where you
> >>>> dont
> >>>>>> want
> >>>>>>>> any latency at all. And WAN issues are a big deal too.
> >>>> Incidentally,
> >>>>> I
> >>>>>>> was
> >>>>>>>> also founder of Orbital Data which was a WAN optimization
> >> company
> >>>> so
> >>>>>> I've
> >>>>>>>> done a lot of work with protocols over long distances.
> >>>>>>>>
> >>>>>>>> On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com>
> >>> wrote:
> >>>>>>>>
> >>>>>>>>> Paul,
> >>>>>>>>>
> >>>>>>>>> Excellent questions. See my answers below. Thanks,
> >>>>>>>>>
> >>>>>>>>> On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <
> >>>>> psutter@quantbench.com
> >>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Kafka looks like an exciting project, thanks for opening it
> >>> up.
> >>>>>>>>>>
> >>>>>>>>>> I have a few questions:
> >>>>>>>>>>
> >>>>>>>>>> 1. Are checksums end to end (ie, created by the producer
> >> and
> >>>>>> checked
> >>>>>>> by
> >>>>>>>>> the
> >>>>>>>>>> consumer)? or are they only used to confirm buffercache
> >>>> behavior
> >>>>> on
> >>>>>>>> disk
> >>>>>>>>> as
> >>>>>>>>>> mentioned in the documentation? Bit errors occur vastly
> >> more
> >>>>> often
> >>>>>>> than
> >>>>>>>>>> most
> >>>>>>>>>> people assume, often because of device driver bugs. TCP
> >> only
> >>>>>> detects
> >>>>>>> 1
> >>>>>>>>>> error
> >>>>>>>>>> in 65536, so errors can flow through (if you like I can
> >> send
> >>>>> links
> >>>>>> to
> >>>>>>>>>> papers
> >>>>>>>>>> describing the need for checksums everywhere).
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Checksum is generated at the producer and propagated to the
> >>>> broker
> >>>>>> and
> >>>>>>>>> eventually the consumer. Currently, we only validate the
> >>> checksum
> >>>>> at
> >>>>>>> the
> >>>>>>>>> broker. We could further validate it at the consumer in the
> >>>> future.
> >>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 2. The consumer has a pretty solid mechanism to ensure it
> >>> hasnt
> >>>>>>> missed
> >>>>>>>>> any
> >>>>>>>>>> messages (i like the design by the way), but how does the
> >>>>> producer
> >>>>>>> know
> >>>>>>>>>> that
> >>>>>>>>>> all of its messages have been stored? (no apparent message
> >> id
> >>>> on
> >>>>>> that
> >>>>>>>>> side
> >>>>>>>>>> since the message id isnt known until the message is
> >> written
> >>> to
> >>>>> the
> >>>>>>>>> file).
> >>>>>>>>>> I'm especially curious how failover/replication could be
> >>>>>> implemented
> >>>>>>>> and
> >>>>>>>>>> I'm
> >>>>>>>>>> thinking that acks on the publisher side may help)
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> The producer side auditing is not built-in. At LinkedIn, we
> >> do
> >>>> that
> >>>>>> by
> >>>>>>>>> generating an auditing event periodically in the eventhandler
> >>> of
> >>>>> the
> >>>>>>>> async
> >>>>>>>>> producer. The auditing event contains the number of events
> >>>> produced
> >>>>>> in
> >>>>>>> a
> >>>>>>>>> configured window (e.g., 10 minutes) and are sent to a
> >> separate
> >>>>>> topic.
> >>>>>>>> The
> >>>>>>>>> consumer can read the actual data and the auditing event and
> >>>>> compare
> >>>>>>> the
> >>>>>>>>> counts. See our paper (
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> >>>>>>>>> )
> >>>>>>>>> for some more details.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 3. Has the consumer's flow control been tested over high
> >>>>>>>> bandwidth*delay
> >>>>>>>>>> links? (what bandwidth can you get from a London consumer
> >> of
> >>> an
> >>>>> SF
> >>>>>>>>>> cluster?)
> >>>>>>>>>>
> >>>>>>>>>> Yes, we actually replicate kafka data across data centers,
> >>>> using
> >>>>> an
> >>>>>>>>> embedded consumer in a broker. Again, there is a bit more
> >> info
> >>> on
> >>>>>> this
> >>>>>>> in
> >>>>>>>>> our paper.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> 4. What kind of performance do you get if you set the
> >>>> producer's
> >>>>>>>> message
> >>>>>>>>>> delay to zero? (ie, is there a separate system call for
> >> each
> >>>>>> message?
> >>>>>>>> or
> >>>>>>>>> do
> >>>>>>>>>> you manage to aggregate messages into a smaller number of
> >>>> system
> >>>>>>> calls
> >>>>>>>>> even
> >>>>>>>>>> with a delay of 0?)
> >>>>>>>>>>
> >>>>>>>>>> I assume that you are referring to the flush interval. One
> >>> can
> >>>>>>>> configure
> >>>>>>>>> to
> >>>>>>>>> flush every message to disk. This will slow down the
> >> throughput
> >>>>>>>>> significantly.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> 5. Have you considered using a library like zeromq for the
> >>>>>> messaging
> >>>>>>>>> layer
> >>>>>>>>>> instead of rolling your own? (zeromq will handle #4 cleanly
> >>> at
> >>>>>>> millions
> >>>>>>>>> of
> >>>>>>>>>> messages per second and has clients in 20 languages)
> >>>>>>>>>>
> >>>>>>>>>> No. Our proprietary format allows us to support things like
> >>>>>>> compression
> >>>>>>>>> in
> >>>>>>>>> the future. However, we can definitely look into the zeromq
> >>>> format.
> >>>>>> Is
> >>>>>>>>> their
> >>>>>>>>> messaging layer easily extractable?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> 6. Do you have any plans to support intermediate processing
> >>>>>> elements
> >>>>>>>> the
> >>>>>>>>>> way
> >>>>>>>>>> Flume supports?
> >>>>>>>>>>
> >>>>>>>>>> For now, we are just focusing on getting the raw messaging
> >>>> layer
> >>>>>>> solid.
> >>>>>>>>> We
> >>>>>>>>> have worked a bit on streaming processing and will look into
> >>> that
> >>>>>> again
> >>>>>>>> in
> >>>>>>>>> the future.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> 7. The docs mention that new versions will only be released
> >>>> after
> >>>>>>> they
> >>>>>>>>> are
> >>>>>>>>>> in production at LinkedIn? Does that mean that the latest
> >>>> version
> >>>>>> of
> >>>>>>>> the
> >>>>>>>>>> source code is hidden at LinkedIn and contributors would
> >> have
> >>>> to
> >>>>>>> throw
> >>>>>>>>>> patches over the wall and wait months to get the integrated
> >>>>>> product?
> >>>>>>>>>>
> >>>>>>>>>> What we ran at LinkedIn is the same version in open source
> >>> and
> >>>>>> there
> >>>>>>> is
> >>>>>>>>> no
> >>>>>>>>> internal repository of Kafka at LinkedIn. We plan to maintain
> >>>> that
> >>>>> in
> >>>>>>> the
> >>>>>>>>> future.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> Thanks!
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>

Re: Kafka questions

Posted by "Alan D. Cabrera" <li...@toolazydogs.com>.
It would be good to move the content of the PDF files to the wiki so that the community can participate in the design.  These PDF files need to be removed.


Regards,
Alan

On Jul 20, 2011, at 9:07 AM, Jun Rao wrote:

> Oliver,
> 
> We have a design for replication (see the design doc and subtasks at
> https://issues.apache.org/jira/browse/KAFKA-50). We are currently wrapping
> up the compression support and will start working on replication soon.
> 
> Jun
> 
> On Tue, Jul 19, 2011 at 12:59 PM, Olivier Pomel <ol...@datadoghq.com> wrote:
> 
>> Thanks, guys, this was a great thread. May be worth pointing to it in the
>> online docs as it asks and answers a lot of interesting questions about the
>> performance characteristics and tradeoffs made in Kafka.
>> 
>> How far out do you think built-in replication is?
>> Best,
>> O.
>> 
>> 
>> 
>> On Tue, Jul 19, 2011 at 3:23 PM, Jay Kreps <ja...@gmail.com> wrote:
>> 
>>> Agreed, no reason the policy to hand out messages should not be
>>> configurable. We were hoping to make the whole question irrelevant with
>> the
>>> replication since then the producer can choose the replication level it
>>> wants and fsync durability should be less of a concern.
>>> 
>>> I agree with your comment that a good implementation of streaming with
>> acks
>>> being potentially superior.
>>> 
>>> -Jay
>>> 
>>> On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <psutter@quantbench.com
>>>> wrote:
>>> 
>>>> Jay,
>>>> 
>>>> Ah - thanks for the clarification on the delay in the broker. It would
>> be
>>>> nice to if that were a configuration option, so that the end user can
>>>> choose
>>>> only to forward messages that have been written to disk, or choose to
>>> have
>>>> the data forwarded immediately. When you implement replication data
>>> hitting
>>>> the disk will matter less.
>>>> 
>>>> On the delay in the producer, I think it could best be resolved through
>>>> measurement. In your paper you compare two different approaches, and
>> I'm
>>>> proposing a third:
>>>> 
>>>> 1. Send and wait (single message, JMS style)
>>>> 2. Batch, send, and wait (Kafka today)
>>>> 3. Stream with ACKs
>>>> 
>>>> Removing any wait for a reply should increase throughput, not decrease
>>> it,
>>>> so you're likely trading latency against potential CPU efficiency. And
>>> the
>>>> CPU savings is a question best resolved by measurement.
>>>> 
>>>> I'd also encourage you to think about the WAN case. When you
>>> send-and-wait,
>>>> you need to send a buffer that is >> the bandwidth delay product to
>>>> approach
>>>> full line utilization, and the line will go idle for one RTT while you
>>> stop
>>>> to wait for a reply. The bandwidth*delay product can get large (10s of
>>>> megabytes), and end users will rarely understand the need to tune the
>>> batch
>>>> size to increase throughput. They'll just say it's slow over long
>>>> distances.
>>>> 
>>>> All that said - your use case doesn't require minimizing latency or WAN
>>>> use,
>>>> so I can really understand if this isn't a priority for you.
>>>> 
>>>> It's a well designed product that has had some real thought put into
>> it.
>>>> It's a really promising system, thanks for taking the time to respond
>> to
>>> my
>>>> comments.
>>>> 
>>>> Paul
>>>> 
>>>> On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <ja...@gmail.com>
>> wrote:
>>>> 
>>>>> Ah, I think what you are describing in zeromq is essentially the
>>>> equivalent
>>>>> of group commit for the socket. Essentially you wait until the socket
>>> is
>>>> no
>>>>> longer writable and then begin to queue data. This is an interesting
>>>> idea.
>>>>> Of course it would only have a positive effect when you had already
>>>>> overflowed the socket buffer and were sending a very high throughput
>> of
>>>>> small messages. That basically is a way to degrade an overloaded
>>>>> synchronous
>>>>> send into a batched send. This is not really the same as what we have
>>>> done,
>>>>> which is to allow the ability to trade off latency for throughput in
>> a
>>>>> configurable manner. The reason the later is important is that we do
>>> not
>>>>> have a handful of producers sending at a rate that saturates the
>>> network
>>>>> I/O
>>>>> capacity of those servers (the case where the group commit would
>> help)
>>>> but
>>>>> rather we have thousands of producers sending at a medium low volume,
>>> so
>>>> we
>>>>> would never hit that in our use case. The advantage of batching is
>>> fewer
>>>>> requests that hit the server, and larger packets. Where the group
>>> commit
>>>>> would help is for the synchronous producer benchmarks, where you
>> could
>>>>> potentially get much better throughput. This is something we should
>>>>> consider
>>>>> adding.
>>>>> 
>>>>> To be clear, though, we have not added latency in our layer, just
>> made
>>> a
>>>>> configurable way to trade-off latency for throughput. This is
>>>> unambiguously
>>>>> a good thing, I think.
>>>>> 
>>>>> With respect to mmap, i think you are misunderstanding where the
>>> latency
>>>>> comes from. We immediately write data to the filesystem with no delay
>>>>> whatsoever. This incurs the overhead of a system call, as you point
>>> out,
>>>>> which could be avoided by mmap, but that doesn't add much in the way
>> of
>>>>> latency. The latency comes from the fact that we do not make the
>>> written
>>>>> data available to consumers until we fsync the file to "ensure" the
>>>>> durability of consumed messages. The frequency of the fsync is
>>>>> configurable,
>>>>> anything either immediate or with a time or # messages threshold.
>> This
>>>>> again
>>>>> trades latency for throughput.
>>>>> 
>>>>> -Jay
>>>>> 
>>>>> On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <
>> psutter@quantbench.com
>>>>>> wrote:
>>>>> 
>>>>>> *Producer latency* - I'm not familiar with zeromq internals but my
>>>>>> understanding is that they send the first message(s) immediately
>> and
>>> as
>>>>> TCP
>>>>>> queues up the data, it will eventually block as the send buffer
>>> fills,
>>>>> and
>>>>>> during this time messages can queue up, and thte net-net is that on
>>>>> average
>>>>>> the number of system calls is << the number of messages. The key is
>>>>> having
>>>>>> a
>>>>>> separate thread for network operations with very efficient thread
>>>>>> coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is
>> a
>>>>> blight
>>>>>> against humanity.
>>>>>> 
>>>>>> Having any sort of delay adds latency. If every developer thinks
>> its
>>> OK
>>>>> to
>>>>>> add a little latency in his layer, pretty soon you end up with 10
>>>> second
>>>>>> end
>>>>>> to end latency.
>>>>>> 
>>>>>> Having an "accumulated message count" is also bad for WAN
>>> performance.
>>>> If
>>>>>> your "window size" is a set of delayed messages, the only way to
>> deal
>>>>> with
>>>>>> a
>>>>>> large bandwidth*delay product is to delay a lot of messages, then
>>> send
>>>>>> them.
>>>>>> You can fit a lot of data into a fiber. Imagine a gigabit link with
>>>> 100ms
>>>>>> roundtrip time, you can store 100MB in the fiber. And you need a
>>>>> multiples
>>>>>> of that for buffering if you need to do a retransmit.
>>>>>> 
>>>>>> *Broker Latency *- With mmap the memcpy() of the message should
>> make
>>>> the
>>>>>> data available to a thread even in another process, the pages that
>>> you
>>>>> have
>>>>>> mapped are also in the buffer cache and available to a sendfile()
>>> call.
>>>>> or
>>>>>> at least I think so. The flush to physical disk (or msync() in this
>>>> case)
>>>>>> would still be delayed but without impacting end to end latency.
>>>>>> 
>>>>>> That said, in benchmarks I have done the fastest IO with the lowest
>>> CPU
>>>>>> overhead is unbuffered (direct) IO (which is lower overhead than
>>> using
>>>>> the
>>>>>> buffer cache with or without memory mapping), but then you'd have
>> to
>>>>> manage
>>>>>> your own buffer pool and run your broker in a single multithreaded
>>>>> process.
>>>>>> But thats getting more extreme. Just getting rid of this buffer
>> write
>>>>> delay
>>>>>> by using memory mapping will remove a big chunk of latency.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <ja...@gmail.com>
>>>> wrote:
>>>>>> 
>>>>>>> Hi Paul,
>>>>>>> 
>>>>>>> We are definitely interested in lowering latency--lower is always
>>>>>>> better--but that was not a major concern for us so far (we were
>>>>> replacing
>>>>>> a
>>>>>>> system with 1 hour latency), so we haven't focused on it yet. As
>>> you
>>>>>>> describe latency in our setup at linkedin comes from batching on
>>> the
>>>>>>> frontend and batching on the kafka servers do to very lenient
>> flush
>>>>>>> settings.
>>>>>>> 
>>>>>>> I am interested in your comments on zeromq. Do they actually have
>> a
>>>>>> better
>>>>>>> approach for this problem even when using TCP? If so I would be
>>>>>> interested
>>>>>>> to understand. The way I see things this is about trading
>>> throughput
>>>>> and
>>>>>>> latency. On the producer side you have only a few options:
>>>> immediately
>>>>>>> write
>>>>>>> the data to the socket buffer for sending or wait and see if the
>>>>>>> application
>>>>>>> writes more data. The OS will do this for you unless you set
>>>>> TCP_NODELAY,
>>>>>>> but the OS is relatively inflexible, it doesn't understand your
>>> data
>>>> so
>>>>> I
>>>>>>> think it just waits 200ms or until the socket buffer is full.
>>>>>>> 
>>>>>>> The current approach in the async producer captures the same
>>>> tradeoff,
>>>>>> but
>>>>>>> a
>>>>>>> little more flexibly, it allows you to specify a max delay and
>> max
>>>>>>> accumulated message count, data is written when either of those
>> is
>>>> hit.
>>>>>>> 
>>>>>>> Is it possible to better capture this tradeoff? Basically I am
>> not
>>>>> aware
>>>>>> of
>>>>>>> any other trick here if you are using TCP, so i would be
>> interested
>>>> in
>>>>>> what
>>>>>>> zeromq does if they are doing this better.
>>>>>>> 
>>>>>>> We do indeed write each message set to the filesystem as it
>> arrives
>>>> but
>>>>>> we
>>>>>>> distribute messages to consumers only after the write has been
>>>> flushed
>>>>> to
>>>>>>> disk, delaying (batching) that flush is the cause of the latency
>>> but
>>>>> also
>>>>>>> gives better use of IOPs by generating larger writes. Mmap would
>>>> remove
>>>>>> the
>>>>>>> system call (which would be good), but not the flush I think. As
>>> you
>>>>> say,
>>>>>>> adding replication allows giving stronger guarantees without
>>> actually
>>>>>>> caring
>>>>>>> about durability on a particular server which would make it
>>> possible
>>>> to
>>>>>>> distribute messages to consumers after ack from some number of
>>> other
>>>>>>> servers
>>>>>>> irrespective of flushing to disk.
>>>>>>> 
>>>>>>> -Jay
>>>>>>> 
>>>>>>> On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <
>>> psutter@quantbench.com
>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Jun
>>>>>>>> 
>>>>>>>> Thanks for your answers and the link to the paper - that helps
>> a
>>>> lot,
>>>>>>>> especially the comment in the paper that 10 second end to end
>>>> latency
>>>>>> is
>>>>>>>> good enough for your intended use case.
>>>>>>>> 
>>>>>>>> We're looking for much lower latencies, and the basic design of
>>>> Kafka
>>>>>>> feels
>>>>>>>> like it should support latencies in milliseconds with a few
>>>> changes.
>>>>>>> We're
>>>>>>>> either going to build our own system, or help develop something
>>>> that
>>>>>>>> already
>>>>>>>> exists, so please take my comments in the constructive way
>>> they're
>>>>>>> intended
>>>>>>>> (I realize the changes I'm suggesting are outside your intended
>>> use
>>>>>> case,
>>>>>>>> but if you're interested we may be able to provide a very
>> capable
>>>>>>> developer
>>>>>>>> to help with the work, assuming we choose kafka over the other
>>>>> zillion
>>>>>>>> streaming systems that are coming out of the woodwork).
>>>>>>>> 
>>>>>>>> a. *Producer "queue.time"* - In my question 4 below, I was
>>>> referring
>>>>> to
>>>>>>> the
>>>>>>>> producer queue time.  With a default value of 5 seconds, that
>>>>> accounts
>>>>>>> for
>>>>>>>> half your end to end latency. A system like zeromq is optimized
>>> to
>>>>>> write
>>>>>>>> data immediately without delay, but in such a way to minimizes
>>> the
>>>>>> number
>>>>>>>> of
>>>>>>>> system calls required during high throughput messages. Zeromq
>> is
>>> no
>>>>>>>> nirvana,
>>>>>>>> but it has a number of nice properties.
>>>>>>>> 
>>>>>>>> b. *Broker "log.default.flush.interval.ms"* - The default
>> value
>>> of
>>>> 3
>>>>>>>> seconds
>>>>>>>> appears to be another significant source of latency in the
>>> system,
>>>>>>> assuming
>>>>>>>> that clients are unable to access data until it has been
>> flushed.
>>>>> Since
>>>>>>> you
>>>>>>>> have wisely chosen to take advantage of the buffer cache as
>> part
>>> of
>>>>>> your
>>>>>>>> system design, it seems that you could remove this latency
>>>> completely
>>>>>> by
>>>>>>>> memory mapping the partitions and memcpying each message as it
>>>>> arrives.
>>>>>>>> With
>>>>>>>> the right IPC mechanism clients could have immediate access to
>>> new
>>>>>>>> messages.
>>>>>>>> 
>>>>>>>> c. *Batching, sync vs async, replication, and auditing*. Its
>>>>>>> understandable
>>>>>>>> that you've chosen a a forensic approach to producer
>> reliability
>>>>> (after
>>>>>>> the
>>>>>>>> fact auditing), but when you implement replication it would be
>>>> really
>>>>>>> nice
>>>>>>>> to revise the producer protocol mechanisms. If you used a
>>> streaming
>>>>>>>> mechanism with producer offsets and ACKs, you could ensure
>>> reliable
>>>>>>>> delivery
>>>>>>>> of producer streams to multiple brokers without the need to
>>> choose
>>>> a
>>>>>>> "batch
>>>>>>>> size" or "queue.time". This could also give you active/active
>>>>> failover
>>>>>> of
>>>>>>>> brokers. This may also help in the WAN case (my question 3
>> below)
>>>>>> because
>>>>>>>> you will be able to adaptively stuff more and more data through
>>> the
>>>>>> fiber
>>>>>>>> for high bandwidth*delay links without having to choose a large
>>>>> "batch
>>>>>>>> size"
>>>>>>>> nor have the additional latency that entails. Oh, and it will
>>> help
>>>>> you
>>>>>>> deal
>>>>>>>> with CRC errors once you start checking for them.
>>>>>>>> 
>>>>>>>> c. *Performance measurements* - I'd like to make a suggestion
>> for
>>>>> your
>>>>>>>> performance measurements. Your benchmarks measure throughput,
>> but
>>> a
>>>>>>>> throughput number is meaningless without an associated "% cpu
>>>> time".
>>>>>>>> Ideally
>>>>>>>> all measurements achieve wire speed (100MB/sec) at 0% CPU
>> (since,
>>>>> after
>>>>>>>> all,
>>>>>>>> this is plumbing and we assume the cores in the system should
>>> have
>>>>>>> capacity
>>>>>>>> set aside for useful work too). Obviously nobody ever achieves
>>>> this,
>>>>>> but
>>>>>>> by
>>>>>>>> measuring it one can raise the bar in terms of optimization.
>>>>>>>> 
>>>>>>>> Paul
>>>>>>>> 
>>>>>>>> ps. Just for background, I am the cofounder at Quantcast where
>> we
>>>>>> process
>>>>>>>> 3.5PB of data per day. These questions are related to my new
>>>> startup
>>>>>>>> Quantbench which will deal with financial market data where you
>>>> dont
>>>>>> want
>>>>>>>> any latency at all. And WAN issues are a big deal too.
>>>> Incidentally,
>>>>> I
>>>>>>> was
>>>>>>>> also founder of Orbital Data which was a WAN optimization
>> company
>>>> so
>>>>>> I've
>>>>>>>> done a lot of work with protocols over long distances.
>>>>>>>> 
>>>>>>>> On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com>
>>> wrote:
>>>>>>>> 
>>>>>>>>> Paul,
>>>>>>>>> 
>>>>>>>>> Excellent questions. See my answers below. Thanks,
>>>>>>>>> 
>>>>>>>>> On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <
>>>>> psutter@quantbench.com
>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Kafka looks like an exciting project, thanks for opening it
>>> up.
>>>>>>>>>> 
>>>>>>>>>> I have a few questions:
>>>>>>>>>> 
>>>>>>>>>> 1. Are checksums end to end (ie, created by the producer
>> and
>>>>>> checked
>>>>>>> by
>>>>>>>>> the
>>>>>>>>>> consumer)? or are they only used to confirm buffercache
>>>> behavior
>>>>> on
>>>>>>>> disk
>>>>>>>>> as
>>>>>>>>>> mentioned in the documentation? Bit errors occur vastly
>> more
>>>>> often
>>>>>>> than
>>>>>>>>>> most
>>>>>>>>>> people assume, often because of device driver bugs. TCP
>> only
>>>>>> detects
>>>>>>> 1
>>>>>>>>>> error
>>>>>>>>>> in 65536, so errors can flow through (if you like I can
>> send
>>>>> links
>>>>>> to
>>>>>>>>>> papers
>>>>>>>>>> describing the need for checksums everywhere).
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Checksum is generated at the producer and propagated to the
>>>> broker
>>>>>> and
>>>>>>>>> eventually the consumer. Currently, we only validate the
>>> checksum
>>>>> at
>>>>>>> the
>>>>>>>>> broker. We could further validate it at the consumer in the
>>>> future.
>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 2. The consumer has a pretty solid mechanism to ensure it
>>> hasnt
>>>>>>> missed
>>>>>>>>> any
>>>>>>>>>> messages (i like the design by the way), but how does the
>>>>> producer
>>>>>>> know
>>>>>>>>>> that
>>>>>>>>>> all of its messages have been stored? (no apparent message
>> id
>>>> on
>>>>>> that
>>>>>>>>> side
>>>>>>>>>> since the message id isnt known until the message is
>> written
>>> to
>>>>> the
>>>>>>>>> file).
>>>>>>>>>> I'm especially curious how failover/replication could be
>>>>>> implemented
>>>>>>>> and
>>>>>>>>>> I'm
>>>>>>>>>> thinking that acks on the publisher side may help)
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> The producer side auditing is not built-in. At LinkedIn, we
>> do
>>>> that
>>>>>> by
>>>>>>>>> generating an auditing event periodically in the eventhandler
>>> of
>>>>> the
>>>>>>>> async
>>>>>>>>> producer. The auditing event contains the number of events
>>>> produced
>>>>>> in
>>>>>>> a
>>>>>>>>> configured window (e.g., 10 minutes) and are sent to a
>> separate
>>>>>> topic.
>>>>>>>> The
>>>>>>>>> consumer can read the actual data and the auditing event and
>>>>> compare
>>>>>>> the
>>>>>>>>> counts. See our paper (
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
>>>>>>>>> )
>>>>>>>>> for some more details.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 3. Has the consumer's flow control been tested over high
>>>>>>>> bandwidth*delay
>>>>>>>>>> links? (what bandwidth can you get from a London consumer
>> of
>>> an
>>>>> SF
>>>>>>>>>> cluster?)
>>>>>>>>>> 
>>>>>>>>>> Yes, we actually replicate kafka data across data centers,
>>>> using
>>>>> an
>>>>>>>>> embedded consumer in a broker. Again, there is a bit more
>> info
>>> on
>>>>>> this
>>>>>>> in
>>>>>>>>> our paper.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> 4. What kind of performance do you get if you set the
>>>> producer's
>>>>>>>> message
>>>>>>>>>> delay to zero? (ie, is there a separate system call for
>> each
>>>>>> message?
>>>>>>>> or
>>>>>>>>> do
>>>>>>>>>> you manage to aggregate messages into a smaller number of
>>>> system
>>>>>>> calls
>>>>>>>>> even
>>>>>>>>>> with a delay of 0?)
>>>>>>>>>> 
>>>>>>>>>> I assume that you are referring to the flush interval. One
>>> can
>>>>>>>> configure
>>>>>>>>> to
>>>>>>>>> flush every message to disk. This will slow down the
>> throughput
>>>>>>>>> significantly.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> 5. Have you considered using a library like zeromq for the
>>>>>> messaging
>>>>>>>>> layer
>>>>>>>>>> instead of rolling your own? (zeromq will handle #4 cleanly
>>> at
>>>>>>> millions
>>>>>>>>> of
>>>>>>>>>> messages per second and has clients in 20 languages)
>>>>>>>>>> 
>>>>>>>>>> No. Our proprietary format allows us to support things like
>>>>>>> compression
>>>>>>>>> in
>>>>>>>>> the future. However, we can definitely look into the zeromq
>>>> format.
>>>>>> Is
>>>>>>>>> their
>>>>>>>>> messaging layer easily extractable?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> 6. Do you have any plans to support intermediate processing
>>>>>> elements
>>>>>>>> the
>>>>>>>>>> way
>>>>>>>>>> Flume supports?
>>>>>>>>>> 
>>>>>>>>>> For now, we are just focusing on getting the raw messaging
>>>> layer
>>>>>>> solid.
>>>>>>>>> We
>>>>>>>>> have worked a bit on streaming processing and will look into
>>> that
>>>>>> again
>>>>>>>> in
>>>>>>>>> the future.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> 7. The docs mention that new versions will only be released
>>>> after
>>>>>>> they
>>>>>>>>> are
>>>>>>>>>> in production at LinkedIn? Does that mean that the latest
>>>> version
>>>>>> of
>>>>>>>> the
>>>>>>>>>> source code is hidden at LinkedIn and contributors would
>> have
>>>> to
>>>>>>> throw
>>>>>>>>>> patches over the wall and wait months to get the integrated
>>>>>> product?
>>>>>>>>>> 
>>>>>>>>>> What we ran at LinkedIn is the same version in open source
>>> and
>>>>>> there
>>>>>>> is
>>>>>>>>> no
>>>>>>>>> internal repository of Kafka at LinkedIn. We plan to maintain
>>>> that
>>>>> in
>>>>>>> the
>>>>>>>>> future.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> Thanks!
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 


Re: Kafka questions

Posted by Jun Rao <ju...@gmail.com>.
Oliver,

We have a design for replication (see the design doc and subtasks at
https://issues.apache.org/jira/browse/KAFKA-50). We are currently wrapping
up the compression support and will start working on replication soon.

Jun

On Tue, Jul 19, 2011 at 12:59 PM, Olivier Pomel <ol...@datadoghq.com> wrote:

> Thanks, guys, this was a great thread. May be worth pointing to it in the
> online docs as it asks and answers a lot of interesting questions about the
> performance characteristics and tradeoffs made in Kafka.
>
> How far out do you think built-in replication is?
> Best,
> O.
>
>
>
> On Tue, Jul 19, 2011 at 3:23 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Agreed, no reason the policy to hand out messages should not be
> > configurable. We were hoping to make the whole question irrelevant with
> the
> > replication since then the producer can choose the replication level it
> > wants and fsync durability should be less of a concern.
> >
> > I agree with your comment that a good implementation of streaming with
> acks
> > being potentially superior.
> >
> > -Jay
> >
> > On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <psutter@quantbench.com
> > >wrote:
> >
> > > Jay,
> > >
> > > Ah - thanks for the clarification on the delay in the broker. It would
> be
> > > nice to if that were a configuration option, so that the end user can
> > > choose
> > > only to forward messages that have been written to disk, or choose to
> > have
> > > the data forwarded immediately. When you implement replication data
> > hitting
> > > the disk will matter less.
> > >
> > > On the delay in the producer, I think it could best be resolved through
> > > measurement. In your paper you compare two different approaches, and
> I'm
> > > proposing a third:
> > >
> > > 1. Send and wait (single message, JMS style)
> > > 2. Batch, send, and wait (Kafka today)
> > > 3. Stream with ACKs
> > >
> > > Removing any wait for a reply should increase throughput, not decrease
> > it,
> > > so you're likely trading latency against potential CPU efficiency. And
> > the
> > > CPU savings is a question best resolved by measurement.
> > >
> > > I'd also encourage you to think about the WAN case. When you
> > send-and-wait,
> > > you need to send a buffer that is >> the bandwidth delay product to
> > > approach
> > > full line utilization, and the line will go idle for one RTT while you
> > stop
> > > to wait for a reply. The bandwidth*delay product can get large (10s of
> > > megabytes), and end users will rarely understand the need to tune the
> > batch
> > > size to increase throughput. They'll just say it's slow over long
> > > distances.
> > >
> > > All that said - your use case doesn't require minimizing latency or WAN
> > > use,
> > > so I can really understand if this isn't a priority for you.
> > >
> > > It's a well designed product that has had some real thought put into
> it.
> > > It's a really promising system, thanks for taking the time to respond
> to
> > my
> > > comments.
> > >
> > > Paul
> > >
> > > On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >
> > > > Ah, I think what you are describing in zeromq is essentially the
> > > equivalent
> > > > of group commit for the socket. Essentially you wait until the socket
> > is
> > > no
> > > > longer writable and then begin to queue data. This is an interesting
> > > idea.
> > > > Of course it would only have a positive effect when you had already
> > > > overflowed the socket buffer and were sending a very high throughput
> of
> > > > small messages. That basically is a way to degrade an overloaded
> > > > synchronous
> > > > send into a batched send. This is not really the same as what we have
> > > done,
> > > > which is to allow the ability to trade off latency for throughput in
> a
> > > > configurable manner. The reason the later is important is that we do
> > not
> > > > have a handful of producers sending at a rate that saturates the
> > network
> > > > I/O
> > > > capacity of those servers (the case where the group commit would
> help)
> > > but
> > > > rather we have thousands of producers sending at a medium low volume,
> > so
> > > we
> > > > would never hit that in our use case. The advantage of batching is
> > fewer
> > > > requests that hit the server, and larger packets. Where the group
> > commit
> > > > would help is for the synchronous producer benchmarks, where you
> could
> > > > potentially get much better throughput. This is something we should
> > > > consider
> > > > adding.
> > > >
> > > > To be clear, though, we have not added latency in our layer, just
> made
> > a
> > > > configurable way to trade-off latency for throughput. This is
> > > unambiguously
> > > > a good thing, I think.
> > > >
> > > > With respect to mmap, i think you are misunderstanding where the
> > latency
> > > > comes from. We immediately write data to the filesystem with no delay
> > > > whatsoever. This incurs the overhead of a system call, as you point
> > out,
> > > > which could be avoided by mmap, but that doesn't add much in the way
> of
> > > > latency. The latency comes from the fact that we do not make the
> > written
> > > > data available to consumers until we fsync the file to "ensure" the
> > > > durability of consumed messages. The frequency of the fsync is
> > > > configurable,
> > > > anything either immediate or with a time or # messages threshold.
> This
> > > > again
> > > > trades latency for throughput.
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <
> psutter@quantbench.com
> > > > >wrote:
> > > >
> > > > > *Producer latency* - I'm not familiar with zeromq internals but my
> > > > > understanding is that they send the first message(s) immediately
> and
> > as
> > > > TCP
> > > > > queues up the data, it will eventually block as the send buffer
> > fills,
> > > > and
> > > > > during this time messages can queue up, and thte net-net is that on
> > > > average
> > > > > the number of system calls is << the number of messages. The key is
> > > > having
> > > > > a
> > > > > separate thread for network operations with very efficient thread
> > > > > coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is
> a
> > > > blight
> > > > > against humanity.
> > > > >
> > > > > Having any sort of delay adds latency. If every developer thinks
> its
> > OK
> > > > to
> > > > > add a little latency in his layer, pretty soon you end up with 10
> > > second
> > > > > end
> > > > > to end latency.
> > > > >
> > > > > Having an "accumulated message count" is also bad for WAN
> > performance.
> > > If
> > > > > your "window size" is a set of delayed messages, the only way to
> deal
> > > > with
> > > > > a
> > > > > large bandwidth*delay product is to delay a lot of messages, then
> > send
> > > > > them.
> > > > > You can fit a lot of data into a fiber. Imagine a gigabit link with
> > > 100ms
> > > > > roundtrip time, you can store 100MB in the fiber. And you need a
> > > > multiples
> > > > > of that for buffering if you need to do a retransmit.
> > > > >
> > > > > *Broker Latency *- With mmap the memcpy() of the message should
> make
> > > the
> > > > > data available to a thread even in another process, the pages that
> > you
> > > > have
> > > > > mapped are also in the buffer cache and available to a sendfile()
> > call.
> > > > or
> > > > > at least I think so. The flush to physical disk (or msync() in this
> > > case)
> > > > > would still be delayed but without impacting end to end latency.
> > > > >
> > > > > That said, in benchmarks I have done the fastest IO with the lowest
> > CPU
> > > > > overhead is unbuffered (direct) IO (which is lower overhead than
> > using
> > > > the
> > > > > buffer cache with or without memory mapping), but then you'd have
> to
> > > > manage
> > > > > your own buffer pool and run your broker in a single multithreaded
> > > > process.
> > > > > But thats getting more extreme. Just getting rid of this buffer
> write
> > > > delay
> > > > > by using memory mapping will remove a big chunk of latency.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <ja...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Paul,
> > > > > >
> > > > > > We are definitely interested in lowering latency--lower is always
> > > > > > better--but that was not a major concern for us so far (we were
> > > > replacing
> > > > > a
> > > > > > system with 1 hour latency), so we haven't focused on it yet. As
> > you
> > > > > > describe latency in our setup at linkedin comes from batching on
> > the
> > > > > > frontend and batching on the kafka servers do to very lenient
> flush
> > > > > > settings.
> > > > > >
> > > > > > I am interested in your comments on zeromq. Do they actually have
> a
> > > > > better
> > > > > > approach for this problem even when using TCP? If so I would be
> > > > > interested
> > > > > > to understand. The way I see things this is about trading
> > throughput
> > > > and
> > > > > > latency. On the producer side you have only a few options:
> > > immediately
> > > > > > write
> > > > > > the data to the socket buffer for sending or wait and see if the
> > > > > > application
> > > > > > writes more data. The OS will do this for you unless you set
> > > > TCP_NODELAY,
> > > > > > but the OS is relatively inflexible, it doesn't understand your
> > data
> > > so
> > > > I
> > > > > > think it just waits 200ms or until the socket buffer is full.
> > > > > >
> > > > > > The current approach in the async producer captures the same
> > > tradeoff,
> > > > > but
> > > > > > a
> > > > > > little more flexibly, it allows you to specify a max delay and
> max
> > > > > > accumulated message count, data is written when either of those
> is
> > > hit.
> > > > > >
> > > > > > Is it possible to better capture this tradeoff? Basically I am
> not
> > > > aware
> > > > > of
> > > > > > any other trick here if you are using TCP, so i would be
> interested
> > > in
> > > > > what
> > > > > > zeromq does if they are doing this better.
> > > > > >
> > > > > > We do indeed write each message set to the filesystem as it
> arrives
> > > but
> > > > > we
> > > > > > distribute messages to consumers only after the write has been
> > > flushed
> > > > to
> > > > > > disk, delaying (batching) that flush is the cause of the latency
> > but
> > > > also
> > > > > > gives better use of IOPs by generating larger writes. Mmap would
> > > remove
> > > > > the
> > > > > > system call (which would be good), but not the flush I think. As
> > you
> > > > say,
> > > > > > adding replication allows giving stronger guarantees without
> > actually
> > > > > > caring
> > > > > > about durability on a particular server which would make it
> > possible
> > > to
> > > > > > distribute messages to consumers after ack from some number of
> > other
> > > > > > servers
> > > > > > irrespective of flushing to disk.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <
> > psutter@quantbench.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > Thanks for your answers and the link to the paper - that helps
> a
> > > lot,
> > > > > > > especially the comment in the paper that 10 second end to end
> > > latency
> > > > > is
> > > > > > > good enough for your intended use case.
> > > > > > >
> > > > > > > We're looking for much lower latencies, and the basic design of
> > > Kafka
> > > > > > feels
> > > > > > > like it should support latencies in milliseconds with a few
> > > changes.
> > > > > > We're
> > > > > > > either going to build our own system, or help develop something
> > > that
> > > > > > > already
> > > > > > > exists, so please take my comments in the constructive way
> > they're
> > > > > > intended
> > > > > > > (I realize the changes I'm suggesting are outside your intended
> > use
> > > > > case,
> > > > > > > but if you're interested we may be able to provide a very
> capable
> > > > > > developer
> > > > > > > to help with the work, assuming we choose kafka over the other
> > > > zillion
> > > > > > > streaming systems that are coming out of the woodwork).
> > > > > > >
> > > > > > > a. *Producer "queue.time"* - In my question 4 below, I was
> > > referring
> > > > to
> > > > > > the
> > > > > > > producer queue time.  With a default value of 5 seconds, that
> > > > accounts
> > > > > > for
> > > > > > > half your end to end latency. A system like zeromq is optimized
> > to
> > > > > write
> > > > > > > data immediately without delay, but in such a way to minimizes
> > the
> > > > > number
> > > > > > > of
> > > > > > > system calls required during high throughput messages. Zeromq
> is
> > no
> > > > > > > nirvana,
> > > > > > > but it has a number of nice properties.
> > > > > > >
> > > > > > > b. *Broker "log.default.flush.interval.ms"* - The default
> value
> > of
> > > 3
> > > > > > > seconds
> > > > > > > appears to be another significant source of latency in the
> > system,
> > > > > > assuming
> > > > > > > that clients are unable to access data until it has been
> flushed.
> > > > Since
> > > > > > you
> > > > > > > have wisely chosen to take advantage of the buffer cache as
> part
> > of
> > > > > your
> > > > > > > system design, it seems that you could remove this latency
> > > completely
> > > > > by
> > > > > > > memory mapping the partitions and memcpying each message as it
> > > > arrives.
> > > > > > > With
> > > > > > > the right IPC mechanism clients could have immediate access to
> > new
> > > > > > > messages.
> > > > > > >
> > > > > > > c. *Batching, sync vs async, replication, and auditing*. Its
> > > > > > understandable
> > > > > > > that you've chosen a a forensic approach to producer
> reliability
> > > > (after
> > > > > > the
> > > > > > > fact auditing), but when you implement replication it would be
> > > really
> > > > > > nice
> > > > > > > to revise the producer protocol mechanisms. If you used a
> > streaming
> > > > > > > mechanism with producer offsets and ACKs, you could ensure
> > reliable
> > > > > > > delivery
> > > > > > > of producer streams to multiple brokers without the need to
> > choose
> > > a
> > > > > > "batch
> > > > > > > size" or "queue.time". This could also give you active/active
> > > > failover
> > > > > of
> > > > > > > brokers. This may also help in the WAN case (my question 3
> below)
> > > > > because
> > > > > > > you will be able to adaptively stuff more and more data through
> > the
> > > > > fiber
> > > > > > > for high bandwidth*delay links without having to choose a large
> > > > "batch
> > > > > > > size"
> > > > > > > nor have the additional latency that entails. Oh, and it will
> > help
> > > > you
> > > > > > deal
> > > > > > > with CRC errors once you start checking for them.
> > > > > > >
> > > > > > > c. *Performance measurements* - I'd like to make a suggestion
> for
> > > > your
> > > > > > > performance measurements. Your benchmarks measure throughput,
> but
> > a
> > > > > > > throughput number is meaningless without an associated "% cpu
> > > time".
> > > > > > > Ideally
> > > > > > > all measurements achieve wire speed (100MB/sec) at 0% CPU
> (since,
> > > > after
> > > > > > > all,
> > > > > > > this is plumbing and we assume the cores in the system should
> > have
> > > > > > capacity
> > > > > > > set aside for useful work too). Obviously nobody ever achieves
> > > this,
> > > > > but
> > > > > > by
> > > > > > > measuring it one can raise the bar in terms of optimization.
> > > > > > >
> > > > > > > Paul
> > > > > > >
> > > > > > > ps. Just for background, I am the cofounder at Quantcast where
> we
> > > > > process
> > > > > > > 3.5PB of data per day. These questions are related to my new
> > > startup
> > > > > > > Quantbench which will deal with financial market data where you
> > > dont
> > > > > want
> > > > > > > any latency at all. And WAN issues are a big deal too.
> > > Incidentally,
> > > > I
> > > > > > was
> > > > > > > also founder of Orbital Data which was a WAN optimization
> company
> > > so
> > > > > I've
> > > > > > > done a lot of work with protocols over long distances.
> > > > > > >
> > > > > > > On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > > Paul,
> > > > > > > >
> > > > > > > > Excellent questions. See my answers below. Thanks,
> > > > > > > >
> > > > > > > > On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <
> > > > psutter@quantbench.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Kafka looks like an exciting project, thanks for opening it
> > up.
> > > > > > > > >
> > > > > > > > > I have a few questions:
> > > > > > > > >
> > > > > > > > > 1. Are checksums end to end (ie, created by the producer
> and
> > > > > checked
> > > > > > by
> > > > > > > > the
> > > > > > > > > consumer)? or are they only used to confirm buffercache
> > > behavior
> > > > on
> > > > > > > disk
> > > > > > > > as
> > > > > > > > > mentioned in the documentation? Bit errors occur vastly
> more
> > > > often
> > > > > > than
> > > > > > > > > most
> > > > > > > > > people assume, often because of device driver bugs. TCP
> only
> > > > > detects
> > > > > > 1
> > > > > > > > > error
> > > > > > > > > in 65536, so errors can flow through (if you like I can
> send
> > > > links
> > > > > to
> > > > > > > > > papers
> > > > > > > > > describing the need for checksums everywhere).
> > > > > > > > >
> > > > > > > >
> > > > > > > > Checksum is generated at the producer and propagated to the
> > > broker
> > > > > and
> > > > > > > > eventually the consumer. Currently, we only validate the
> > checksum
> > > > at
> > > > > > the
> > > > > > > > broker. We could further validate it at the consumer in the
> > > future.
> > > > > > > >
> > > > > > > > >
> > > > > > > > > 2. The consumer has a pretty solid mechanism to ensure it
> > hasnt
> > > > > > missed
> > > > > > > > any
> > > > > > > > > messages (i like the design by the way), but how does the
> > > > producer
> > > > > > know
> > > > > > > > > that
> > > > > > > > > all of its messages have been stored? (no apparent message
> id
> > > on
> > > > > that
> > > > > > > > side
> > > > > > > > > since the message id isnt known until the message is
> written
> > to
> > > > the
> > > > > > > > file).
> > > > > > > > > I'm especially curious how failover/replication could be
> > > > > implemented
> > > > > > > and
> > > > > > > > > I'm
> > > > > > > > > thinking that acks on the publisher side may help)
> > > > > > > > >
> > > > > > > >
> > > > > > > > The producer side auditing is not built-in. At LinkedIn, we
> do
> > > that
> > > > > by
> > > > > > > > generating an auditing event periodically in the eventhandler
> > of
> > > > the
> > > > > > > async
> > > > > > > > producer. The auditing event contains the number of events
> > > produced
> > > > > in
> > > > > > a
> > > > > > > > configured window (e.g., 10 minutes) and are sent to a
> separate
> > > > > topic.
> > > > > > > The
> > > > > > > > consumer can read the actual data and the auditing event and
> > > > compare
> > > > > > the
> > > > > > > > counts. See our paper (
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> > > > > > > > )
> > > > > > > > for some more details.
> > > > > > > >
> > > > > > > >
> > > > > > > > >
> > > > > > > > > 3. Has the consumer's flow control been tested over high
> > > > > > > bandwidth*delay
> > > > > > > > > links? (what bandwidth can you get from a London consumer
> of
> > an
> > > > SF
> > > > > > > > > cluster?)
> > > > > > > > >
> > > > > > > > > Yes, we actually replicate kafka data across data centers,
> > > using
> > > > an
> > > > > > > > embedded consumer in a broker. Again, there is a bit more
> info
> > on
> > > > > this
> > > > > > in
> > > > > > > > our paper.
> > > > > > > >
> > > > > > > >
> > > > > > > > > 4. What kind of performance do you get if you set the
> > > producer's
> > > > > > > message
> > > > > > > > > delay to zero? (ie, is there a separate system call for
> each
> > > > > message?
> > > > > > > or
> > > > > > > > do
> > > > > > > > > you manage to aggregate messages into a smaller number of
> > > system
> > > > > > calls
> > > > > > > > even
> > > > > > > > > with a delay of 0?)
> > > > > > > > >
> > > > > > > > > I assume that you are referring to the flush interval. One
> > can
> > > > > > > configure
> > > > > > > > to
> > > > > > > > flush every message to disk. This will slow down the
> throughput
> > > > > > > > significantly.
> > > > > > > >
> > > > > > > >
> > > > > > > > > 5. Have you considered using a library like zeromq for the
> > > > > messaging
> > > > > > > > layer
> > > > > > > > > instead of rolling your own? (zeromq will handle #4 cleanly
> > at
> > > > > > millions
> > > > > > > > of
> > > > > > > > > messages per second and has clients in 20 languages)
> > > > > > > > >
> > > > > > > > > No. Our proprietary format allows us to support things like
> > > > > > compression
> > > > > > > > in
> > > > > > > > the future. However, we can definitely look into the zeromq
> > > format.
> > > > > Is
> > > > > > > > their
> > > > > > > > messaging layer easily extractable?
> > > > > > > >
> > > > > > > >
> > > > > > > > > 6. Do you have any plans to support intermediate processing
> > > > > elements
> > > > > > > the
> > > > > > > > > way
> > > > > > > > > Flume supports?
> > > > > > > > >
> > > > > > > > > For now, we are just focusing on getting the raw messaging
> > > layer
> > > > > > solid.
> > > > > > > > We
> > > > > > > > have worked a bit on streaming processing and will look into
> > that
> > > > > again
> > > > > > > in
> > > > > > > > the future.
> > > > > > > >
> > > > > > > >
> > > > > > > > > 7. The docs mention that new versions will only be released
> > > after
> > > > > > they
> > > > > > > > are
> > > > > > > > > in production at LinkedIn? Does that mean that the latest
> > > version
> > > > > of
> > > > > > > the
> > > > > > > > > source code is hidden at LinkedIn and contributors would
> have
> > > to
> > > > > > throw
> > > > > > > > > patches over the wall and wait months to get the integrated
> > > > > product?
> > > > > > > > >
> > > > > > > > > What we ran at LinkedIn is the same version in open source
> > and
> > > > > there
> > > > > > is
> > > > > > > > no
> > > > > > > > internal repository of Kafka at LinkedIn. We plan to maintain
> > > that
> > > > in
> > > > > > the
> > > > > > > > future.
> > > > > > > >
> > > > > > > >
> > > > > > > > > Thanks!
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka questions

Posted by Olivier Pomel <ol...@datadoghq.com>.
Thanks, guys, this was a great thread. May be worth pointing to it in the
online docs as it asks and answers a lot of interesting questions about the
performance characteristics and tradeoffs made in Kafka.

How far out do you think built-in replication is?
Best,
O.



On Tue, Jul 19, 2011 at 3:23 PM, Jay Kreps <ja...@gmail.com> wrote:

> Agreed, no reason the policy to hand out messages should not be
> configurable. We were hoping to make the whole question irrelevant with the
> replication since then the producer can choose the replication level it
> wants and fsync durability should be less of a concern.
>
> I agree with your comment that a good implementation of streaming with acks
> being potentially superior.
>
> -Jay
>
> On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <psutter@quantbench.com
> >wrote:
>
> > Jay,
> >
> > Ah - thanks for the clarification on the delay in the broker. It would be
> > nice to if that were a configuration option, so that the end user can
> > choose
> > only to forward messages that have been written to disk, or choose to
> have
> > the data forwarded immediately. When you implement replication data
> hitting
> > the disk will matter less.
> >
> > On the delay in the producer, I think it could best be resolved through
> > measurement. In your paper you compare two different approaches, and I'm
> > proposing a third:
> >
> > 1. Send and wait (single message, JMS style)
> > 2. Batch, send, and wait (Kafka today)
> > 3. Stream with ACKs
> >
> > Removing any wait for a reply should increase throughput, not decrease
> it,
> > so you're likely trading latency against potential CPU efficiency. And
> the
> > CPU savings is a question best resolved by measurement.
> >
> > I'd also encourage you to think about the WAN case. When you
> send-and-wait,
> > you need to send a buffer that is >> the bandwidth delay product to
> > approach
> > full line utilization, and the line will go idle for one RTT while you
> stop
> > to wait for a reply. The bandwidth*delay product can get large (10s of
> > megabytes), and end users will rarely understand the need to tune the
> batch
> > size to increase throughput. They'll just say it's slow over long
> > distances.
> >
> > All that said - your use case doesn't require minimizing latency or WAN
> > use,
> > so I can really understand if this isn't a priority for you.
> >
> > It's a well designed product that has had some real thought put into it.
> > It's a really promising system, thanks for taking the time to respond to
> my
> > comments.
> >
> > Paul
> >
> > On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Ah, I think what you are describing in zeromq is essentially the
> > equivalent
> > > of group commit for the socket. Essentially you wait until the socket
> is
> > no
> > > longer writable and then begin to queue data. This is an interesting
> > idea.
> > > Of course it would only have a positive effect when you had already
> > > overflowed the socket buffer and were sending a very high throughput of
> > > small messages. That basically is a way to degrade an overloaded
> > > synchronous
> > > send into a batched send. This is not really the same as what we have
> > done,
> > > which is to allow the ability to trade off latency for throughput in a
> > > configurable manner. The reason the later is important is that we do
> not
> > > have a handful of producers sending at a rate that saturates the
> network
> > > I/O
> > > capacity of those servers (the case where the group commit would help)
> > but
> > > rather we have thousands of producers sending at a medium low volume,
> so
> > we
> > > would never hit that in our use case. The advantage of batching is
> fewer
> > > requests that hit the server, and larger packets. Where the group
> commit
> > > would help is for the synchronous producer benchmarks, where you could
> > > potentially get much better throughput. This is something we should
> > > consider
> > > adding.
> > >
> > > To be clear, though, we have not added latency in our layer, just made
> a
> > > configurable way to trade-off latency for throughput. This is
> > unambiguously
> > > a good thing, I think.
> > >
> > > With respect to mmap, i think you are misunderstanding where the
> latency
> > > comes from. We immediately write data to the filesystem with no delay
> > > whatsoever. This incurs the overhead of a system call, as you point
> out,
> > > which could be avoided by mmap, but that doesn't add much in the way of
> > > latency. The latency comes from the fact that we do not make the
> written
> > > data available to consumers until we fsync the file to "ensure" the
> > > durability of consumed messages. The frequency of the fsync is
> > > configurable,
> > > anything either immediate or with a time or # messages threshold. This
> > > again
> > > trades latency for throughput.
> > >
> > > -Jay
> > >
> > > On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <psutter@quantbench.com
> > > >wrote:
> > >
> > > > *Producer latency* - I'm not familiar with zeromq internals but my
> > > > understanding is that they send the first message(s) immediately and
> as
> > > TCP
> > > > queues up the data, it will eventually block as the send buffer
> fills,
> > > and
> > > > during this time messages can queue up, and thte net-net is that on
> > > average
> > > > the number of system calls is << the number of messages. The key is
> > > having
> > > > a
> > > > separate thread for network operations with very efficient thread
> > > > coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is a
> > > blight
> > > > against humanity.
> > > >
> > > > Having any sort of delay adds latency. If every developer thinks its
> OK
> > > to
> > > > add a little latency in his layer, pretty soon you end up with 10
> > second
> > > > end
> > > > to end latency.
> > > >
> > > > Having an "accumulated message count" is also bad for WAN
> performance.
> > If
> > > > your "window size" is a set of delayed messages, the only way to deal
> > > with
> > > > a
> > > > large bandwidth*delay product is to delay a lot of messages, then
> send
> > > > them.
> > > > You can fit a lot of data into a fiber. Imagine a gigabit link with
> > 100ms
> > > > roundtrip time, you can store 100MB in the fiber. And you need a
> > > multiples
> > > > of that for buffering if you need to do a retransmit.
> > > >
> > > > *Broker Latency *- With mmap the memcpy() of the message should make
> > the
> > > > data available to a thread even in another process, the pages that
> you
> > > have
> > > > mapped are also in the buffer cache and available to a sendfile()
> call.
> > > or
> > > > at least I think so. The flush to physical disk (or msync() in this
> > case)
> > > > would still be delayed but without impacting end to end latency.
> > > >
> > > > That said, in benchmarks I have done the fastest IO with the lowest
> CPU
> > > > overhead is unbuffered (direct) IO (which is lower overhead than
> using
> > > the
> > > > buffer cache with or without memory mapping), but then you'd have to
> > > manage
> > > > your own buffer pool and run your broker in a single multithreaded
> > > process.
> > > > But thats getting more extreme. Just getting rid of this buffer write
> > > delay
> > > > by using memory mapping will remove a big chunk of latency.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Paul,
> > > > >
> > > > > We are definitely interested in lowering latency--lower is always
> > > > > better--but that was not a major concern for us so far (we were
> > > replacing
> > > > a
> > > > > system with 1 hour latency), so we haven't focused on it yet. As
> you
> > > > > describe latency in our setup at linkedin comes from batching on
> the
> > > > > frontend and batching on the kafka servers do to very lenient flush
> > > > > settings.
> > > > >
> > > > > I am interested in your comments on zeromq. Do they actually have a
> > > > better
> > > > > approach for this problem even when using TCP? If so I would be
> > > > interested
> > > > > to understand. The way I see things this is about trading
> throughput
> > > and
> > > > > latency. On the producer side you have only a few options:
> > immediately
> > > > > write
> > > > > the data to the socket buffer for sending or wait and see if the
> > > > > application
> > > > > writes more data. The OS will do this for you unless you set
> > > TCP_NODELAY,
> > > > > but the OS is relatively inflexible, it doesn't understand your
> data
> > so
> > > I
> > > > > think it just waits 200ms or until the socket buffer is full.
> > > > >
> > > > > The current approach in the async producer captures the same
> > tradeoff,
> > > > but
> > > > > a
> > > > > little more flexibly, it allows you to specify a max delay and max
> > > > > accumulated message count, data is written when either of those is
> > hit.
> > > > >
> > > > > Is it possible to better capture this tradeoff? Basically I am not
> > > aware
> > > > of
> > > > > any other trick here if you are using TCP, so i would be interested
> > in
> > > > what
> > > > > zeromq does if they are doing this better.
> > > > >
> > > > > We do indeed write each message set to the filesystem as it arrives
> > but
> > > > we
> > > > > distribute messages to consumers only after the write has been
> > flushed
> > > to
> > > > > disk, delaying (batching) that flush is the cause of the latency
> but
> > > also
> > > > > gives better use of IOPs by generating larger writes. Mmap would
> > remove
> > > > the
> > > > > system call (which would be good), but not the flush I think. As
> you
> > > say,
> > > > > adding replication allows giving stronger guarantees without
> actually
> > > > > caring
> > > > > about durability on a particular server which would make it
> possible
> > to
> > > > > distribute messages to consumers after ack from some number of
> other
> > > > > servers
> > > > > irrespective of flushing to disk.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <
> psutter@quantbench.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Jun
> > > > > >
> > > > > > Thanks for your answers and the link to the paper - that helps a
> > lot,
> > > > > > especially the comment in the paper that 10 second end to end
> > latency
> > > > is
> > > > > > good enough for your intended use case.
> > > > > >
> > > > > > We're looking for much lower latencies, and the basic design of
> > Kafka
> > > > > feels
> > > > > > like it should support latencies in milliseconds with a few
> > changes.
> > > > > We're
> > > > > > either going to build our own system, or help develop something
> > that
> > > > > > already
> > > > > > exists, so please take my comments in the constructive way
> they're
> > > > > intended
> > > > > > (I realize the changes I'm suggesting are outside your intended
> use
> > > > case,
> > > > > > but if you're interested we may be able to provide a very capable
> > > > > developer
> > > > > > to help with the work, assuming we choose kafka over the other
> > > zillion
> > > > > > streaming systems that are coming out of the woodwork).
> > > > > >
> > > > > > a. *Producer "queue.time"* - In my question 4 below, I was
> > referring
> > > to
> > > > > the
> > > > > > producer queue time.  With a default value of 5 seconds, that
> > > accounts
> > > > > for
> > > > > > half your end to end latency. A system like zeromq is optimized
> to
> > > > write
> > > > > > data immediately without delay, but in such a way to minimizes
> the
> > > > number
> > > > > > of
> > > > > > system calls required during high throughput messages. Zeromq is
> no
> > > > > > nirvana,
> > > > > > but it has a number of nice properties.
> > > > > >
> > > > > > b. *Broker "log.default.flush.interval.ms"* - The default value
> of
> > 3
> > > > > > seconds
> > > > > > appears to be another significant source of latency in the
> system,
> > > > > assuming
> > > > > > that clients are unable to access data until it has been flushed.
> > > Since
> > > > > you
> > > > > > have wisely chosen to take advantage of the buffer cache as part
> of
> > > > your
> > > > > > system design, it seems that you could remove this latency
> > completely
> > > > by
> > > > > > memory mapping the partitions and memcpying each message as it
> > > arrives.
> > > > > > With
> > > > > > the right IPC mechanism clients could have immediate access to
> new
> > > > > > messages.
> > > > > >
> > > > > > c. *Batching, sync vs async, replication, and auditing*. Its
> > > > > understandable
> > > > > > that you've chosen a a forensic approach to producer reliability
> > > (after
> > > > > the
> > > > > > fact auditing), but when you implement replication it would be
> > really
> > > > > nice
> > > > > > to revise the producer protocol mechanisms. If you used a
> streaming
> > > > > > mechanism with producer offsets and ACKs, you could ensure
> reliable
> > > > > > delivery
> > > > > > of producer streams to multiple brokers without the need to
> choose
> > a
> > > > > "batch
> > > > > > size" or "queue.time". This could also give you active/active
> > > failover
> > > > of
> > > > > > brokers. This may also help in the WAN case (my question 3 below)
> > > > because
> > > > > > you will be able to adaptively stuff more and more data through
> the
> > > > fiber
> > > > > > for high bandwidth*delay links without having to choose a large
> > > "batch
> > > > > > size"
> > > > > > nor have the additional latency that entails. Oh, and it will
> help
> > > you
> > > > > deal
> > > > > > with CRC errors once you start checking for them.
> > > > > >
> > > > > > c. *Performance measurements* - I'd like to make a suggestion for
> > > your
> > > > > > performance measurements. Your benchmarks measure throughput, but
> a
> > > > > > throughput number is meaningless without an associated "% cpu
> > time".
> > > > > > Ideally
> > > > > > all measurements achieve wire speed (100MB/sec) at 0% CPU (since,
> > > after
> > > > > > all,
> > > > > > this is plumbing and we assume the cores in the system should
> have
> > > > > capacity
> > > > > > set aside for useful work too). Obviously nobody ever achieves
> > this,
> > > > but
> > > > > by
> > > > > > measuring it one can raise the bar in terms of optimization.
> > > > > >
> > > > > > Paul
> > > > > >
> > > > > > ps. Just for background, I am the cofounder at Quantcast where we
> > > > process
> > > > > > 3.5PB of data per day. These questions are related to my new
> > startup
> > > > > > Quantbench which will deal with financial market data where you
> > dont
> > > > want
> > > > > > any latency at all. And WAN issues are a big deal too.
> > Incidentally,
> > > I
> > > > > was
> > > > > > also founder of Orbital Data which was a WAN optimization company
> > so
> > > > I've
> > > > > > done a lot of work with protocols over long distances.
> > > > > >
> > > > > > On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com>
> wrote:
> > > > > >
> > > > > > > Paul,
> > > > > > >
> > > > > > > Excellent questions. See my answers below. Thanks,
> > > > > > >
> > > > > > > On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <
> > > psutter@quantbench.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Kafka looks like an exciting project, thanks for opening it
> up.
> > > > > > > >
> > > > > > > > I have a few questions:
> > > > > > > >
> > > > > > > > 1. Are checksums end to end (ie, created by the producer and
> > > > checked
> > > > > by
> > > > > > > the
> > > > > > > > consumer)? or are they only used to confirm buffercache
> > behavior
> > > on
> > > > > > disk
> > > > > > > as
> > > > > > > > mentioned in the documentation? Bit errors occur vastly more
> > > often
> > > > > than
> > > > > > > > most
> > > > > > > > people assume, often because of device driver bugs. TCP only
> > > > detects
> > > > > 1
> > > > > > > > error
> > > > > > > > in 65536, so errors can flow through (if you like I can send
> > > links
> > > > to
> > > > > > > > papers
> > > > > > > > describing the need for checksums everywhere).
> > > > > > > >
> > > > > > >
> > > > > > > Checksum is generated at the producer and propagated to the
> > broker
> > > > and
> > > > > > > eventually the consumer. Currently, we only validate the
> checksum
> > > at
> > > > > the
> > > > > > > broker. We could further validate it at the consumer in the
> > future.
> > > > > > >
> > > > > > > >
> > > > > > > > 2. The consumer has a pretty solid mechanism to ensure it
> hasnt
> > > > > missed
> > > > > > > any
> > > > > > > > messages (i like the design by the way), but how does the
> > > producer
> > > > > know
> > > > > > > > that
> > > > > > > > all of its messages have been stored? (no apparent message id
> > on
> > > > that
> > > > > > > side
> > > > > > > > since the message id isnt known until the message is written
> to
> > > the
> > > > > > > file).
> > > > > > > > I'm especially curious how failover/replication could be
> > > > implemented
> > > > > > and
> > > > > > > > I'm
> > > > > > > > thinking that acks on the publisher side may help)
> > > > > > > >
> > > > > > >
> > > > > > > The producer side auditing is not built-in. At LinkedIn, we do
> > that
> > > > by
> > > > > > > generating an auditing event periodically in the eventhandler
> of
> > > the
> > > > > > async
> > > > > > > producer. The auditing event contains the number of events
> > produced
> > > > in
> > > > > a
> > > > > > > configured window (e.g., 10 minutes) and are sent to a separate
> > > > topic.
> > > > > > The
> > > > > > > consumer can read the actual data and the auditing event and
> > > compare
> > > > > the
> > > > > > > counts. See our paper (
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> > > > > > > )
> > > > > > > for some more details.
> > > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > > 3. Has the consumer's flow control been tested over high
> > > > > > bandwidth*delay
> > > > > > > > links? (what bandwidth can you get from a London consumer of
> an
> > > SF
> > > > > > > > cluster?)
> > > > > > > >
> > > > > > > > Yes, we actually replicate kafka data across data centers,
> > using
> > > an
> > > > > > > embedded consumer in a broker. Again, there is a bit more info
> on
> > > > this
> > > > > in
> > > > > > > our paper.
> > > > > > >
> > > > > > >
> > > > > > > > 4. What kind of performance do you get if you set the
> > producer's
> > > > > > message
> > > > > > > > delay to zero? (ie, is there a separate system call for each
> > > > message?
> > > > > > or
> > > > > > > do
> > > > > > > > you manage to aggregate messages into a smaller number of
> > system
> > > > > calls
> > > > > > > even
> > > > > > > > with a delay of 0?)
> > > > > > > >
> > > > > > > > I assume that you are referring to the flush interval. One
> can
> > > > > > configure
> > > > > > > to
> > > > > > > flush every message to disk. This will slow down the throughput
> > > > > > > significantly.
> > > > > > >
> > > > > > >
> > > > > > > > 5. Have you considered using a library like zeromq for the
> > > > messaging
> > > > > > > layer
> > > > > > > > instead of rolling your own? (zeromq will handle #4 cleanly
> at
> > > > > millions
> > > > > > > of
> > > > > > > > messages per second and has clients in 20 languages)
> > > > > > > >
> > > > > > > > No. Our proprietary format allows us to support things like
> > > > > compression
> > > > > > > in
> > > > > > > the future. However, we can definitely look into the zeromq
> > format.
> > > > Is
> > > > > > > their
> > > > > > > messaging layer easily extractable?
> > > > > > >
> > > > > > >
> > > > > > > > 6. Do you have any plans to support intermediate processing
> > > > elements
> > > > > > the
> > > > > > > > way
> > > > > > > > Flume supports?
> > > > > > > >
> > > > > > > > For now, we are just focusing on getting the raw messaging
> > layer
> > > > > solid.
> > > > > > > We
> > > > > > > have worked a bit on streaming processing and will look into
> that
> > > > again
> > > > > > in
> > > > > > > the future.
> > > > > > >
> > > > > > >
> > > > > > > > 7. The docs mention that new versions will only be released
> > after
> > > > > they
> > > > > > > are
> > > > > > > > in production at LinkedIn? Does that mean that the latest
> > version
> > > > of
> > > > > > the
> > > > > > > > source code is hidden at LinkedIn and contributors would have
> > to
> > > > > throw
> > > > > > > > patches over the wall and wait months to get the integrated
> > > > product?
> > > > > > > >
> > > > > > > > What we ran at LinkedIn is the same version in open source
> and
> > > > there
> > > > > is
> > > > > > > no
> > > > > > > internal repository of Kafka at LinkedIn. We plan to maintain
> > that
> > > in
> > > > > the
> > > > > > > future.
> > > > > > >
> > > > > > >
> > > > > > > > Thanks!
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka questions

Posted by Paul Sutter <ps...@quantbench.com>.
If the unflushed messages havent been acked to the publisher, they havent
been lost in the system.

On Wed, Jul 20, 2011 at 9:09 AM, Jun Rao <ju...@gmail.com> wrote:

> Paul,
>
> The only concern is that if we expose unflushed messages, those messages
> could disappear after a broker machine restart.
>
> Jun
>
> On Tue, Jul 19, 2011 at 2:02 PM, Paul Sutter <ps...@quantbench.com>
> wrote:
>
> > One more suggestion:
> >
> > Even before you have replication, it seems that you could delay producer
> > side acks until after the data is recorded to disk, and still pass the
> data
> > forward to consumers immediately.
> >
> >
> >
> > On Jul 19, 2011, at 12:23 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Agreed, no reason the policy to hand out messages should not be
> > > configurable. We were hoping to make the whole question irrelevant with
> > the
> > > replication since then the producer can choose the replication level it
> > > wants and fsync durability should be less of a concern.
> > >
> > > I agree with your comment that a good implementation of streaming with
> > acks
> > > being potentially superior.
> > >
> > > -Jay
> > >
> > > On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <psutter@quantbench.com
> > >wrote:
> > >
> > >> Jay,
> > >>
> > >> Ah - thanks for the clarification on the delay in the broker. It would
> > be
> > >> nice to if that were a configuration option, so that the end user can
> > >> choose
> > >> only to forward messages that have been written to disk, or choose to
> > have
> > >> the data forwarded immediately. When you implement replication data
> > hitting
> > >> the disk will matter less.
> > >>
> > >> On the delay in the producer, I think it could best be resolved
> through
> > >> measurement. In your paper you compare two different approaches, and
> I'm
> > >> proposing a third:
> > >>
> > >> 1. Send and wait (single message, JMS style)
> > >> 2. Batch, send, and wait (Kafka today)
> > >> 3. Stream with ACKs
> > >>
> > >> Removing any wait for a reply should increase throughput, not decrease
> > it,
> > >> so you're likely trading latency against potential CPU efficiency. And
> > the
> > >> CPU savings is a question best resolved by measurement.
> > >>
> > >> I'd also encourage you to think about the WAN case. When you
> > send-and-wait,
> > >> you need to send a buffer that is >> the bandwidth delay product to
> > >> approach
> > >> full line utilization, and the line will go idle for one RTT while you
> > stop
> > >> to wait for a reply. The bandwidth*delay product can get large (10s of
> > >> megabytes), and end users will rarely understand the need to tune the
> > batch
> > >> size to increase throughput. They'll just say it's slow over long
> > >> distances.
> > >>
> > >> All that said - your use case doesn't require minimizing latency or
> WAN
> > >> use,
> > >> so I can really understand if this isn't a priority for you.
> > >>
> > >> It's a well designed product that has had some real thought put into
> it.
> > >> It's a really promising system, thanks for taking the time to respond
> to
> > my
> > >> comments.
> > >>
> > >> Paul
> > >>
> > >> On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > >>
> > >>> Ah, I think what you are describing in zeromq is essentially the
> > >> equivalent
> > >>> of group commit for the socket. Essentially you wait until the socket
> > is
> > >> no
> > >>> longer writable and then begin to queue data. This is an interesting
> > >> idea.
> > >>> Of course it would only have a positive effect when you had already
> > >>> overflowed the socket buffer and were sending a very high throughput
> of
> > >>> small messages. That basically is a way to degrade an overloaded
> > >>> synchronous
> > >>> send into a batched send. This is not really the same as what we have
> > >> done,
> > >>> which is to allow the ability to trade off latency for throughput in
> a
> > >>> configurable manner. The reason the later is important is that we do
> > not
> > >>> have a handful of producers sending at a rate that saturates the
> > network
> > >>> I/O
> > >>> capacity of those servers (the case where the group commit would
> help)
> > >> but
> > >>> rather we have thousands of producers sending at a medium low volume,
> > so
> > >> we
> > >>> would never hit that in our use case. The advantage of batching is
> > fewer
> > >>> requests that hit the server, and larger packets. Where the group
> > commit
> > >>> would help is for the synchronous producer benchmarks, where you
> could
> > >>> potentially get much better throughput. This is something we should
> > >>> consider
> > >>> adding.
> > >>>
> > >>> To be clear, though, we have not added latency in our layer, just
> made
> > a
> > >>> configurable way to trade-off latency for throughput. This is
> > >> unambiguously
> > >>> a good thing, I think.
> > >>>
> > >>> With respect to mmap, i think you are misunderstanding where the
> > latency
> > >>> comes from. We immediately write data to the filesystem with no delay
> > >>> whatsoever. This incurs the overhead of a system call, as you point
> > out,
> > >>> which could be avoided by mmap, but that doesn't add much in the way
> of
> > >>> latency. The latency comes from the fact that we do not make the
> > written
> > >>> data available to consumers until we fsync the file to "ensure" the
> > >>> durability of consumed messages. The frequency of the fsync is
> > >>> configurable,
> > >>> anything either immediate or with a time or # messages threshold.
> This
> > >>> again
> > >>> trades latency for throughput.
> > >>>
> > >>> -Jay
> > >>>
> > >>> On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <
> psutter@quantbench.com
> > >>>> wrote:
> > >>>
> > >>>> *Producer latency* - I'm not familiar with zeromq internals but my
> > >>>> understanding is that they send the first message(s) immediately and
> > as
> > >>> TCP
> > >>>> queues up the data, it will eventually block as the send buffer
> fills,
> > >>> and
> > >>>> during this time messages can queue up, and thte net-net is that on
> > >>> average
> > >>>> the number of system calls is << the number of messages. The key is
> > >>> having
> > >>>> a
> > >>>> separate thread for network operations with very efficient thread
> > >>>> coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is a
> > >>> blight
> > >>>> against humanity.
> > >>>>
> > >>>> Having any sort of delay adds latency. If every developer thinks its
> > OK
> > >>> to
> > >>>> add a little latency in his layer, pretty soon you end up with 10
> > >> second
> > >>>> end
> > >>>> to end latency.
> > >>>>
> > >>>> Having an "accumulated message count" is also bad for WAN
> performance.
> > >> If
> > >>>> your "window size" is a set of delayed messages, the only way to
> deal
> > >>> with
> > >>>> a
> > >>>> large bandwidth*delay product is to delay a lot of messages, then
> send
> > >>>> them.
> > >>>> You can fit a lot of data into a fiber. Imagine a gigabit link with
> > >> 100ms
> > >>>> roundtrip time, you can store 100MB in the fiber. And you need a
> > >>> multiples
> > >>>> of that for buffering if you need to do a retransmit.
> > >>>>
> > >>>> *Broker Latency *- With mmap the memcpy() of the message should make
> > >> the
> > >>>> data available to a thread even in another process, the pages that
> you
> > >>> have
> > >>>> mapped are also in the buffer cache and available to a sendfile()
> > call.
> > >>> or
> > >>>> at least I think so. The flush to physical disk (or msync() in this
> > >> case)
> > >>>> would still be delayed but without impacting end to end latency.
> > >>>>
> > >>>> That said, in benchmarks I have done the fastest IO with the lowest
> > CPU
> > >>>> overhead is unbuffered (direct) IO (which is lower overhead than
> using
> > >>> the
> > >>>> buffer cache with or without memory mapping), but then you'd have to
> > >>> manage
> > >>>> your own buffer pool and run your broker in a single multithreaded
> > >>> process.
> > >>>> But thats getting more extreme. Just getting rid of this buffer
> write
> > >>> delay
> > >>>> by using memory mapping will remove a big chunk of latency.
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <ja...@gmail.com>
> > >> wrote:
> > >>>>
> > >>>>> Hi Paul,
> > >>>>>
> > >>>>> We are definitely interested in lowering latency--lower is always
> > >>>>> better--but that was not a major concern for us so far (we were
> > >>> replacing
> > >>>> a
> > >>>>> system with 1 hour latency), so we haven't focused on it yet. As
> you
> > >>>>> describe latency in our setup at linkedin comes from batching on
> the
> > >>>>> frontend and batching on the kafka servers do to very lenient flush
> > >>>>> settings.
> > >>>>>
> > >>>>> I am interested in your comments on zeromq. Do they actually have a
> > >>>> better
> > >>>>> approach for this problem even when using TCP? If so I would be
> > >>>> interested
> > >>>>> to understand. The way I see things this is about trading
> throughput
> > >>> and
> > >>>>> latency. On the producer side you have only a few options:
> > >> immediately
> > >>>>> write
> > >>>>> the data to the socket buffer for sending or wait and see if the
> > >>>>> application
> > >>>>> writes more data. The OS will do this for you unless you set
> > >>> TCP_NODELAY,
> > >>>>> but the OS is relatively inflexible, it doesn't understand your
> data
> > >> so
> > >>> I
> > >>>>> think it just waits 200ms or until the socket buffer is full.
> > >>>>>
> > >>>>> The current approach in the async producer captures the same
> > >> tradeoff,
> > >>>> but
> > >>>>> a
> > >>>>> little more flexibly, it allows you to specify a max delay and max
> > >>>>> accumulated message count, data is written when either of those is
> > >> hit.
> > >>>>>
> > >>>>> Is it possible to better capture this tradeoff? Basically I am not
> > >>> aware
> > >>>> of
> > >>>>> any other trick here if you are using TCP, so i would be interested
> > >> in
> > >>>> what
> > >>>>> zeromq does if they are doing this better.
> > >>>>>
> > >>>>> We do indeed write each message set to the filesystem as it arrives
> > >> but
> > >>>> we
> > >>>>> distribute messages to consumers only after the write has been
> > >> flushed
> > >>> to
> > >>>>> disk, delaying (batching) that flush is the cause of the latency
> but
> > >>> also
> > >>>>> gives better use of IOPs by generating larger writes. Mmap would
> > >> remove
> > >>>> the
> > >>>>> system call (which would be good), but not the flush I think. As
> you
> > >>> say,
> > >>>>> adding replication allows giving stronger guarantees without
> actually
> > >>>>> caring
> > >>>>> about durability on a particular server which would make it
> possible
> > >> to
> > >>>>> distribute messages to consumers after ack from some number of
> other
> > >>>>> servers
> > >>>>> irrespective of flushing to disk.
> > >>>>>
> > >>>>> -Jay
> > >>>>>
> > >>>>> On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <
> psutter@quantbench.com
> > >>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Jun
> > >>>>>>
> > >>>>>> Thanks for your answers and the link to the paper - that helps a
> > >> lot,
> > >>>>>> especially the comment in the paper that 10 second end to end
> > >> latency
> > >>>> is
> > >>>>>> good enough for your intended use case.
> > >>>>>>
> > >>>>>> We're looking for much lower latencies, and the basic design of
> > >> Kafka
> > >>>>> feels
> > >>>>>> like it should support latencies in milliseconds with a few
> > >> changes.
> > >>>>> We're
> > >>>>>> either going to build our own system, or help develop something
> > >> that
> > >>>>>> already
> > >>>>>> exists, so please take my comments in the constructive way they're
> > >>>>> intended
> > >>>>>> (I realize the changes I'm suggesting are outside your intended
> use
> > >>>> case,
> > >>>>>> but if you're interested we may be able to provide a very capable
> > >>>>> developer
> > >>>>>> to help with the work, assuming we choose kafka over the other
> > >>> zillion
> > >>>>>> streaming systems that are coming out of the woodwork).
> > >>>>>>
> > >>>>>> a. *Producer "queue.time"* - In my question 4 below, I was
> > >> referring
> > >>> to
> > >>>>> the
> > >>>>>> producer queue time.  With a default value of 5 seconds, that
> > >>> accounts
> > >>>>> for
> > >>>>>> half your end to end latency. A system like zeromq is optimized to
> > >>>> write
> > >>>>>> data immediately without delay, but in such a way to minimizes the
> > >>>> number
> > >>>>>> of
> > >>>>>> system calls required during high throughput messages. Zeromq is
> no
> > >>>>>> nirvana,
> > >>>>>> but it has a number of nice properties.
> > >>>>>>
> > >>>>>> b. *Broker "log.default.flush.interval.ms"* - The default value
> of
> > >> 3
> > >>>>>> seconds
> > >>>>>> appears to be another significant source of latency in the system,
> > >>>>> assuming
> > >>>>>> that clients are unable to access data until it has been flushed.
> > >>> Since
> > >>>>> you
> > >>>>>> have wisely chosen to take advantage of the buffer cache as part
> of
> > >>>> your
> > >>>>>> system design, it seems that you could remove this latency
> > >> completely
> > >>>> by
> > >>>>>> memory mapping the partitions and memcpying each message as it
> > >>> arrives.
> > >>>>>> With
> > >>>>>> the right IPC mechanism clients could have immediate access to new
> > >>>>>> messages.
> > >>>>>>
> > >>>>>> c. *Batching, sync vs async, replication, and auditing*. Its
> > >>>>> understandable
> > >>>>>> that you've chosen a a forensic approach to producer reliability
> > >>> (after
> > >>>>> the
> > >>>>>> fact auditing), but when you implement replication it would be
> > >> really
> > >>>>> nice
> > >>>>>> to revise the producer protocol mechanisms. If you used a
> streaming
> > >>>>>> mechanism with producer offsets and ACKs, you could ensure
> reliable
> > >>>>>> delivery
> > >>>>>> of producer streams to multiple brokers without the need to choose
> > >> a
> > >>>>> "batch
> > >>>>>> size" or "queue.time". This could also give you active/active
> > >>> failover
> > >>>> of
> > >>>>>> brokers. This may also help in the WAN case (my question 3 below)
> > >>>> because
> > >>>>>> you will be able to adaptively stuff more and more data through
> the
> > >>>> fiber
> > >>>>>> for high bandwidth*delay links without having to choose a large
> > >>> "batch
> > >>>>>> size"
> > >>>>>> nor have the additional latency that entails. Oh, and it will help
> > >>> you
> > >>>>> deal
> > >>>>>> with CRC errors once you start checking for them.
> > >>>>>>
> > >>>>>> c. *Performance measurements* - I'd like to make a suggestion for
> > >>> your
> > >>>>>> performance measurements. Your benchmarks measure throughput, but
> a
> > >>>>>> throughput number is meaningless without an associated "% cpu
> > >> time".
> > >>>>>> Ideally
> > >>>>>> all measurements achieve wire speed (100MB/sec) at 0% CPU (since,
> > >>> after
> > >>>>>> all,
> > >>>>>> this is plumbing and we assume the cores in the system should have
> > >>>>> capacity
> > >>>>>> set aside for useful work too). Obviously nobody ever achieves
> > >> this,
> > >>>> but
> > >>>>> by
> > >>>>>> measuring it one can raise the bar in terms of optimization.
> > >>>>>>
> > >>>>>> Paul
> > >>>>>>
> > >>>>>> ps. Just for background, I am the cofounder at Quantcast where we
> > >>>> process
> > >>>>>> 3.5PB of data per day. These questions are related to my new
> > >> startup
> > >>>>>> Quantbench which will deal with financial market data where you
> > >> dont
> > >>>> want
> > >>>>>> any latency at all. And WAN issues are a big deal too.
> > >> Incidentally,
> > >>> I
> > >>>>> was
> > >>>>>> also founder of Orbital Data which was a WAN optimization company
> > >> so
> > >>>> I've
> > >>>>>> done a lot of work with protocols over long distances.
> > >>>>>>
> > >>>>>> On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com>
> wrote:
> > >>>>>>
> > >>>>>>> Paul,
> > >>>>>>>
> > >>>>>>> Excellent questions. See my answers below. Thanks,
> > >>>>>>>
> > >>>>>>> On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <
> > >>> psutter@quantbench.com
> > >>>>>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Kafka looks like an exciting project, thanks for opening it up.
> > >>>>>>>>
> > >>>>>>>> I have a few questions:
> > >>>>>>>>
> > >>>>>>>> 1. Are checksums end to end (ie, created by the producer and
> > >>>> checked
> > >>>>> by
> > >>>>>>> the
> > >>>>>>>> consumer)? or are they only used to confirm buffercache
> > >> behavior
> > >>> on
> > >>>>>> disk
> > >>>>>>> as
> > >>>>>>>> mentioned in the documentation? Bit errors occur vastly more
> > >>> often
> > >>>>> than
> > >>>>>>>> most
> > >>>>>>>> people assume, often because of device driver bugs. TCP only
> > >>>> detects
> > >>>>> 1
> > >>>>>>>> error
> > >>>>>>>> in 65536, so errors can flow through (if you like I can send
> > >>> links
> > >>>> to
> > >>>>>>>> papers
> > >>>>>>>> describing the need for checksums everywhere).
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> Checksum is generated at the producer and propagated to the
> > >> broker
> > >>>> and
> > >>>>>>> eventually the consumer. Currently, we only validate the checksum
> > >>> at
> > >>>>> the
> > >>>>>>> broker. We could further validate it at the consumer in the
> > >> future.
> > >>>>>>>
> > >>>>>>>>
> > >>>>>>>> 2. The consumer has a pretty solid mechanism to ensure it hasnt
> > >>>>> missed
> > >>>>>>> any
> > >>>>>>>> messages (i like the design by the way), but how does the
> > >>> producer
> > >>>>> know
> > >>>>>>>> that
> > >>>>>>>> all of its messages have been stored? (no apparent message id
> > >> on
> > >>>> that
> > >>>>>>> side
> > >>>>>>>> since the message id isnt known until the message is written to
> > >>> the
> > >>>>>>> file).
> > >>>>>>>> I'm especially curious how failover/replication could be
> > >>>> implemented
> > >>>>>> and
> > >>>>>>>> I'm
> > >>>>>>>> thinking that acks on the publisher side may help)
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> The producer side auditing is not built-in. At LinkedIn, we do
> > >> that
> > >>>> by
> > >>>>>>> generating an auditing event periodically in the eventhandler of
> > >>> the
> > >>>>>> async
> > >>>>>>> producer. The auditing event contains the number of events
> > >> produced
> > >>>> in
> > >>>>> a
> > >>>>>>> configured window (e.g., 10 minutes) and are sent to a separate
> > >>>> topic.
> > >>>>>> The
> > >>>>>>> consumer can read the actual data and the auditing event and
> > >>> compare
> > >>>>> the
> > >>>>>>> counts. See our paper (
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> > >>>>>>> )
> > >>>>>>> for some more details.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>>
> > >>>>>>>> 3. Has the consumer's flow control been tested over high
> > >>>>>> bandwidth*delay
> > >>>>>>>> links? (what bandwidth can you get from a London consumer of an
> > >>> SF
> > >>>>>>>> cluster?)
> > >>>>>>>>
> > >>>>>>>> Yes, we actually replicate kafka data across data centers,
> > >> using
> > >>> an
> > >>>>>>> embedded consumer in a broker. Again, there is a bit more info on
> > >>>> this
> > >>>>> in
> > >>>>>>> our paper.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>> 4. What kind of performance do you get if you set the
> > >> producer's
> > >>>>>> message
> > >>>>>>>> delay to zero? (ie, is there a separate system call for each
> > >>>> message?
> > >>>>>> or
> > >>>>>>> do
> > >>>>>>>> you manage to aggregate messages into a smaller number of
> > >> system
> > >>>>> calls
> > >>>>>>> even
> > >>>>>>>> with a delay of 0?)
> > >>>>>>>>
> > >>>>>>>> I assume that you are referring to the flush interval. One can
> > >>>>>> configure
> > >>>>>>> to
> > >>>>>>> flush every message to disk. This will slow down the throughput
> > >>>>>>> significantly.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>> 5. Have you considered using a library like zeromq for the
> > >>>> messaging
> > >>>>>>> layer
> > >>>>>>>> instead of rolling your own? (zeromq will handle #4 cleanly at
> > >>>>> millions
> > >>>>>>> of
> > >>>>>>>> messages per second and has clients in 20 languages)
> > >>>>>>>>
> > >>>>>>>> No. Our proprietary format allows us to support things like
> > >>>>> compression
> > >>>>>>> in
> > >>>>>>> the future. However, we can definitely look into the zeromq
> > >> format.
> > >>>> Is
> > >>>>>>> their
> > >>>>>>> messaging layer easily extractable?
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>> 6. Do you have any plans to support intermediate processing
> > >>>> elements
> > >>>>>> the
> > >>>>>>>> way
> > >>>>>>>> Flume supports?
> > >>>>>>>>
> > >>>>>>>> For now, we are just focusing on getting the raw messaging
> > >> layer
> > >>>>> solid.
> > >>>>>>> We
> > >>>>>>> have worked a bit on streaming processing and will look into that
> > >>>> again
> > >>>>>> in
> > >>>>>>> the future.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>> 7. The docs mention that new versions will only be released
> > >> after
> > >>>>> they
> > >>>>>>> are
> > >>>>>>>> in production at LinkedIn? Does that mean that the latest
> > >> version
> > >>>> of
> > >>>>>> the
> > >>>>>>>> source code is hidden at LinkedIn and contributors would have
> > >> to
> > >>>>> throw
> > >>>>>>>> patches over the wall and wait months to get the integrated
> > >>>> product?
> > >>>>>>>>
> > >>>>>>>> What we ran at LinkedIn is the same version in open source and
> > >>>> there
> > >>>>> is
> > >>>>>>> no
> > >>>>>>> internal repository of Kafka at LinkedIn. We plan to maintain
> > >> that
> > >>> in
> > >>>>> the
> > >>>>>>> future.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>> Thanks!
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
>

Re: Kafka questions

Posted by Jun Rao <ju...@gmail.com>.
Paul,

The only concern is that if we expose unflushed messages, those messages
could disappear after a broker machine restart.

Jun

On Tue, Jul 19, 2011 at 2:02 PM, Paul Sutter <ps...@quantbench.com> wrote:

> One more suggestion:
>
> Even before you have replication, it seems that you could delay producer
> side acks until after the data is recorded to disk, and still pass the data
> forward to consumers immediately.
>
>
>
> On Jul 19, 2011, at 12:23 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Agreed, no reason the policy to hand out messages should not be
> > configurable. We were hoping to make the whole question irrelevant with
> the
> > replication since then the producer can choose the replication level it
> > wants and fsync durability should be less of a concern.
> >
> > I agree with your comment that a good implementation of streaming with
> acks
> > being potentially superior.
> >
> > -Jay
> >
> > On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <psutter@quantbench.com
> >wrote:
> >
> >> Jay,
> >>
> >> Ah - thanks for the clarification on the delay in the broker. It would
> be
> >> nice to if that were a configuration option, so that the end user can
> >> choose
> >> only to forward messages that have been written to disk, or choose to
> have
> >> the data forwarded immediately. When you implement replication data
> hitting
> >> the disk will matter less.
> >>
> >> On the delay in the producer, I think it could best be resolved through
> >> measurement. In your paper you compare two different approaches, and I'm
> >> proposing a third:
> >>
> >> 1. Send and wait (single message, JMS style)
> >> 2. Batch, send, and wait (Kafka today)
> >> 3. Stream with ACKs
> >>
> >> Removing any wait for a reply should increase throughput, not decrease
> it,
> >> so you're likely trading latency against potential CPU efficiency. And
> the
> >> CPU savings is a question best resolved by measurement.
> >>
> >> I'd also encourage you to think about the WAN case. When you
> send-and-wait,
> >> you need to send a buffer that is >> the bandwidth delay product to
> >> approach
> >> full line utilization, and the line will go idle for one RTT while you
> stop
> >> to wait for a reply. The bandwidth*delay product can get large (10s of
> >> megabytes), and end users will rarely understand the need to tune the
> batch
> >> size to increase throughput. They'll just say it's slow over long
> >> distances.
> >>
> >> All that said - your use case doesn't require minimizing latency or WAN
> >> use,
> >> so I can really understand if this isn't a priority for you.
> >>
> >> It's a well designed product that has had some real thought put into it.
> >> It's a really promising system, thanks for taking the time to respond to
> my
> >> comments.
> >>
> >> Paul
> >>
> >> On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <ja...@gmail.com>
> wrote:
> >>
> >>> Ah, I think what you are describing in zeromq is essentially the
> >> equivalent
> >>> of group commit for the socket. Essentially you wait until the socket
> is
> >> no
> >>> longer writable and then begin to queue data. This is an interesting
> >> idea.
> >>> Of course it would only have a positive effect when you had already
> >>> overflowed the socket buffer and were sending a very high throughput of
> >>> small messages. That basically is a way to degrade an overloaded
> >>> synchronous
> >>> send into a batched send. This is not really the same as what we have
> >> done,
> >>> which is to allow the ability to trade off latency for throughput in a
> >>> configurable manner. The reason the later is important is that we do
> not
> >>> have a handful of producers sending at a rate that saturates the
> network
> >>> I/O
> >>> capacity of those servers (the case where the group commit would help)
> >> but
> >>> rather we have thousands of producers sending at a medium low volume,
> so
> >> we
> >>> would never hit that in our use case. The advantage of batching is
> fewer
> >>> requests that hit the server, and larger packets. Where the group
> commit
> >>> would help is for the synchronous producer benchmarks, where you could
> >>> potentially get much better throughput. This is something we should
> >>> consider
> >>> adding.
> >>>
> >>> To be clear, though, we have not added latency in our layer, just made
> a
> >>> configurable way to trade-off latency for throughput. This is
> >> unambiguously
> >>> a good thing, I think.
> >>>
> >>> With respect to mmap, i think you are misunderstanding where the
> latency
> >>> comes from. We immediately write data to the filesystem with no delay
> >>> whatsoever. This incurs the overhead of a system call, as you point
> out,
> >>> which could be avoided by mmap, but that doesn't add much in the way of
> >>> latency. The latency comes from the fact that we do not make the
> written
> >>> data available to consumers until we fsync the file to "ensure" the
> >>> durability of consumed messages. The frequency of the fsync is
> >>> configurable,
> >>> anything either immediate or with a time or # messages threshold. This
> >>> again
> >>> trades latency for throughput.
> >>>
> >>> -Jay
> >>>
> >>> On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <psutter@quantbench.com
> >>>> wrote:
> >>>
> >>>> *Producer latency* - I'm not familiar with zeromq internals but my
> >>>> understanding is that they send the first message(s) immediately and
> as
> >>> TCP
> >>>> queues up the data, it will eventually block as the send buffer fills,
> >>> and
> >>>> during this time messages can queue up, and thte net-net is that on
> >>> average
> >>>> the number of system calls is << the number of messages. The key is
> >>> having
> >>>> a
> >>>> separate thread for network operations with very efficient thread
> >>>> coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is a
> >>> blight
> >>>> against humanity.
> >>>>
> >>>> Having any sort of delay adds latency. If every developer thinks its
> OK
> >>> to
> >>>> add a little latency in his layer, pretty soon you end up with 10
> >> second
> >>>> end
> >>>> to end latency.
> >>>>
> >>>> Having an "accumulated message count" is also bad for WAN performance.
> >> If
> >>>> your "window size" is a set of delayed messages, the only way to deal
> >>> with
> >>>> a
> >>>> large bandwidth*delay product is to delay a lot of messages, then send
> >>>> them.
> >>>> You can fit a lot of data into a fiber. Imagine a gigabit link with
> >> 100ms
> >>>> roundtrip time, you can store 100MB in the fiber. And you need a
> >>> multiples
> >>>> of that for buffering if you need to do a retransmit.
> >>>>
> >>>> *Broker Latency *- With mmap the memcpy() of the message should make
> >> the
> >>>> data available to a thread even in another process, the pages that you
> >>> have
> >>>> mapped are also in the buffer cache and available to a sendfile()
> call.
> >>> or
> >>>> at least I think so. The flush to physical disk (or msync() in this
> >> case)
> >>>> would still be delayed but without impacting end to end latency.
> >>>>
> >>>> That said, in benchmarks I have done the fastest IO with the lowest
> CPU
> >>>> overhead is unbuffered (direct) IO (which is lower overhead than using
> >>> the
> >>>> buffer cache with or without memory mapping), but then you'd have to
> >>> manage
> >>>> your own buffer pool and run your broker in a single multithreaded
> >>> process.
> >>>> But thats getting more extreme. Just getting rid of this buffer write
> >>> delay
> >>>> by using memory mapping will remove a big chunk of latency.
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <ja...@gmail.com>
> >> wrote:
> >>>>
> >>>>> Hi Paul,
> >>>>>
> >>>>> We are definitely interested in lowering latency--lower is always
> >>>>> better--but that was not a major concern for us so far (we were
> >>> replacing
> >>>> a
> >>>>> system with 1 hour latency), so we haven't focused on it yet. As you
> >>>>> describe latency in our setup at linkedin comes from batching on the
> >>>>> frontend and batching on the kafka servers do to very lenient flush
> >>>>> settings.
> >>>>>
> >>>>> I am interested in your comments on zeromq. Do they actually have a
> >>>> better
> >>>>> approach for this problem even when using TCP? If so I would be
> >>>> interested
> >>>>> to understand. The way I see things this is about trading throughput
> >>> and
> >>>>> latency. On the producer side you have only a few options:
> >> immediately
> >>>>> write
> >>>>> the data to the socket buffer for sending or wait and see if the
> >>>>> application
> >>>>> writes more data. The OS will do this for you unless you set
> >>> TCP_NODELAY,
> >>>>> but the OS is relatively inflexible, it doesn't understand your data
> >> so
> >>> I
> >>>>> think it just waits 200ms or until the socket buffer is full.
> >>>>>
> >>>>> The current approach in the async producer captures the same
> >> tradeoff,
> >>>> but
> >>>>> a
> >>>>> little more flexibly, it allows you to specify a max delay and max
> >>>>> accumulated message count, data is written when either of those is
> >> hit.
> >>>>>
> >>>>> Is it possible to better capture this tradeoff? Basically I am not
> >>> aware
> >>>> of
> >>>>> any other trick here if you are using TCP, so i would be interested
> >> in
> >>>> what
> >>>>> zeromq does if they are doing this better.
> >>>>>
> >>>>> We do indeed write each message set to the filesystem as it arrives
> >> but
> >>>> we
> >>>>> distribute messages to consumers only after the write has been
> >> flushed
> >>> to
> >>>>> disk, delaying (batching) that flush is the cause of the latency but
> >>> also
> >>>>> gives better use of IOPs by generating larger writes. Mmap would
> >> remove
> >>>> the
> >>>>> system call (which would be good), but not the flush I think. As you
> >>> say,
> >>>>> adding replication allows giving stronger guarantees without actually
> >>>>> caring
> >>>>> about durability on a particular server which would make it possible
> >> to
> >>>>> distribute messages to consumers after ack from some number of other
> >>>>> servers
> >>>>> irrespective of flushing to disk.
> >>>>>
> >>>>> -Jay
> >>>>>
> >>>>> On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <psutter@quantbench.com
> >>>
> >>>>> wrote:
> >>>>>
> >>>>>> Jun
> >>>>>>
> >>>>>> Thanks for your answers and the link to the paper - that helps a
> >> lot,
> >>>>>> especially the comment in the paper that 10 second end to end
> >> latency
> >>>> is
> >>>>>> good enough for your intended use case.
> >>>>>>
> >>>>>> We're looking for much lower latencies, and the basic design of
> >> Kafka
> >>>>> feels
> >>>>>> like it should support latencies in milliseconds with a few
> >> changes.
> >>>>> We're
> >>>>>> either going to build our own system, or help develop something
> >> that
> >>>>>> already
> >>>>>> exists, so please take my comments in the constructive way they're
> >>>>> intended
> >>>>>> (I realize the changes I'm suggesting are outside your intended use
> >>>> case,
> >>>>>> but if you're interested we may be able to provide a very capable
> >>>>> developer
> >>>>>> to help with the work, assuming we choose kafka over the other
> >>> zillion
> >>>>>> streaming systems that are coming out of the woodwork).
> >>>>>>
> >>>>>> a. *Producer "queue.time"* - In my question 4 below, I was
> >> referring
> >>> to
> >>>>> the
> >>>>>> producer queue time.  With a default value of 5 seconds, that
> >>> accounts
> >>>>> for
> >>>>>> half your end to end latency. A system like zeromq is optimized to
> >>>> write
> >>>>>> data immediately without delay, but in such a way to minimizes the
> >>>> number
> >>>>>> of
> >>>>>> system calls required during high throughput messages. Zeromq is no
> >>>>>> nirvana,
> >>>>>> but it has a number of nice properties.
> >>>>>>
> >>>>>> b. *Broker "log.default.flush.interval.ms"* - The default value of
> >> 3
> >>>>>> seconds
> >>>>>> appears to be another significant source of latency in the system,
> >>>>> assuming
> >>>>>> that clients are unable to access data until it has been flushed.
> >>> Since
> >>>>> you
> >>>>>> have wisely chosen to take advantage of the buffer cache as part of
> >>>> your
> >>>>>> system design, it seems that you could remove this latency
> >> completely
> >>>> by
> >>>>>> memory mapping the partitions and memcpying each message as it
> >>> arrives.
> >>>>>> With
> >>>>>> the right IPC mechanism clients could have immediate access to new
> >>>>>> messages.
> >>>>>>
> >>>>>> c. *Batching, sync vs async, replication, and auditing*. Its
> >>>>> understandable
> >>>>>> that you've chosen a a forensic approach to producer reliability
> >>> (after
> >>>>> the
> >>>>>> fact auditing), but when you implement replication it would be
> >> really
> >>>>> nice
> >>>>>> to revise the producer protocol mechanisms. If you used a streaming
> >>>>>> mechanism with producer offsets and ACKs, you could ensure reliable
> >>>>>> delivery
> >>>>>> of producer streams to multiple brokers without the need to choose
> >> a
> >>>>> "batch
> >>>>>> size" or "queue.time". This could also give you active/active
> >>> failover
> >>>> of
> >>>>>> brokers. This may also help in the WAN case (my question 3 below)
> >>>> because
> >>>>>> you will be able to adaptively stuff more and more data through the
> >>>> fiber
> >>>>>> for high bandwidth*delay links without having to choose a large
> >>> "batch
> >>>>>> size"
> >>>>>> nor have the additional latency that entails. Oh, and it will help
> >>> you
> >>>>> deal
> >>>>>> with CRC errors once you start checking for them.
> >>>>>>
> >>>>>> c. *Performance measurements* - I'd like to make a suggestion for
> >>> your
> >>>>>> performance measurements. Your benchmarks measure throughput, but a
> >>>>>> throughput number is meaningless without an associated "% cpu
> >> time".
> >>>>>> Ideally
> >>>>>> all measurements achieve wire speed (100MB/sec) at 0% CPU (since,
> >>> after
> >>>>>> all,
> >>>>>> this is plumbing and we assume the cores in the system should have
> >>>>> capacity
> >>>>>> set aside for useful work too). Obviously nobody ever achieves
> >> this,
> >>>> but
> >>>>> by
> >>>>>> measuring it one can raise the bar in terms of optimization.
> >>>>>>
> >>>>>> Paul
> >>>>>>
> >>>>>> ps. Just for background, I am the cofounder at Quantcast where we
> >>>> process
> >>>>>> 3.5PB of data per day. These questions are related to my new
> >> startup
> >>>>>> Quantbench which will deal with financial market data where you
> >> dont
> >>>> want
> >>>>>> any latency at all. And WAN issues are a big deal too.
> >> Incidentally,
> >>> I
> >>>>> was
> >>>>>> also founder of Orbital Data which was a WAN optimization company
> >> so
> >>>> I've
> >>>>>> done a lot of work with protocols over long distances.
> >>>>>>
> >>>>>> On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com> wrote:
> >>>>>>
> >>>>>>> Paul,
> >>>>>>>
> >>>>>>> Excellent questions. See my answers below. Thanks,
> >>>>>>>
> >>>>>>> On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <
> >>> psutter@quantbench.com
> >>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Kafka looks like an exciting project, thanks for opening it up.
> >>>>>>>>
> >>>>>>>> I have a few questions:
> >>>>>>>>
> >>>>>>>> 1. Are checksums end to end (ie, created by the producer and
> >>>> checked
> >>>>> by
> >>>>>>> the
> >>>>>>>> consumer)? or are they only used to confirm buffercache
> >> behavior
> >>> on
> >>>>>> disk
> >>>>>>> as
> >>>>>>>> mentioned in the documentation? Bit errors occur vastly more
> >>> often
> >>>>> than
> >>>>>>>> most
> >>>>>>>> people assume, often because of device driver bugs. TCP only
> >>>> detects
> >>>>> 1
> >>>>>>>> error
> >>>>>>>> in 65536, so errors can flow through (if you like I can send
> >>> links
> >>>> to
> >>>>>>>> papers
> >>>>>>>> describing the need for checksums everywhere).
> >>>>>>>>
> >>>>>>>
> >>>>>>> Checksum is generated at the producer and propagated to the
> >> broker
> >>>> and
> >>>>>>> eventually the consumer. Currently, we only validate the checksum
> >>> at
> >>>>> the
> >>>>>>> broker. We could further validate it at the consumer in the
> >> future.
> >>>>>>>
> >>>>>>>>
> >>>>>>>> 2. The consumer has a pretty solid mechanism to ensure it hasnt
> >>>>> missed
> >>>>>>> any
> >>>>>>>> messages (i like the design by the way), but how does the
> >>> producer
> >>>>> know
> >>>>>>>> that
> >>>>>>>> all of its messages have been stored? (no apparent message id
> >> on
> >>>> that
> >>>>>>> side
> >>>>>>>> since the message id isnt known until the message is written to
> >>> the
> >>>>>>> file).
> >>>>>>>> I'm especially curious how failover/replication could be
> >>>> implemented
> >>>>>> and
> >>>>>>>> I'm
> >>>>>>>> thinking that acks on the publisher side may help)
> >>>>>>>>
> >>>>>>>
> >>>>>>> The producer side auditing is not built-in. At LinkedIn, we do
> >> that
> >>>> by
> >>>>>>> generating an auditing event periodically in the eventhandler of
> >>> the
> >>>>>> async
> >>>>>>> producer. The auditing event contains the number of events
> >> produced
> >>>> in
> >>>>> a
> >>>>>>> configured window (e.g., 10 minutes) and are sent to a separate
> >>>> topic.
> >>>>>> The
> >>>>>>> consumer can read the actual data and the auditing event and
> >>> compare
> >>>>> the
> >>>>>>> counts. See our paper (
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> >>>>>>> )
> >>>>>>> for some more details.
> >>>>>>>
> >>>>>>>
> >>>>>>>>
> >>>>>>>> 3. Has the consumer's flow control been tested over high
> >>>>>> bandwidth*delay
> >>>>>>>> links? (what bandwidth can you get from a London consumer of an
> >>> SF
> >>>>>>>> cluster?)
> >>>>>>>>
> >>>>>>>> Yes, we actually replicate kafka data across data centers,
> >> using
> >>> an
> >>>>>>> embedded consumer in a broker. Again, there is a bit more info on
> >>>> this
> >>>>> in
> >>>>>>> our paper.
> >>>>>>>
> >>>>>>>
> >>>>>>>> 4. What kind of performance do you get if you set the
> >> producer's
> >>>>>> message
> >>>>>>>> delay to zero? (ie, is there a separate system call for each
> >>>> message?
> >>>>>> or
> >>>>>>> do
> >>>>>>>> you manage to aggregate messages into a smaller number of
> >> system
> >>>>> calls
> >>>>>>> even
> >>>>>>>> with a delay of 0?)
> >>>>>>>>
> >>>>>>>> I assume that you are referring to the flush interval. One can
> >>>>>> configure
> >>>>>>> to
> >>>>>>> flush every message to disk. This will slow down the throughput
> >>>>>>> significantly.
> >>>>>>>
> >>>>>>>
> >>>>>>>> 5. Have you considered using a library like zeromq for the
> >>>> messaging
> >>>>>>> layer
> >>>>>>>> instead of rolling your own? (zeromq will handle #4 cleanly at
> >>>>> millions
> >>>>>>> of
> >>>>>>>> messages per second and has clients in 20 languages)
> >>>>>>>>
> >>>>>>>> No. Our proprietary format allows us to support things like
> >>>>> compression
> >>>>>>> in
> >>>>>>> the future. However, we can definitely look into the zeromq
> >> format.
> >>>> Is
> >>>>>>> their
> >>>>>>> messaging layer easily extractable?
> >>>>>>>
> >>>>>>>
> >>>>>>>> 6. Do you have any plans to support intermediate processing
> >>>> elements
> >>>>>> the
> >>>>>>>> way
> >>>>>>>> Flume supports?
> >>>>>>>>
> >>>>>>>> For now, we are just focusing on getting the raw messaging
> >> layer
> >>>>> solid.
> >>>>>>> We
> >>>>>>> have worked a bit on streaming processing and will look into that
> >>>> again
> >>>>>> in
> >>>>>>> the future.
> >>>>>>>
> >>>>>>>
> >>>>>>>> 7. The docs mention that new versions will only be released
> >> after
> >>>>> they
> >>>>>>> are
> >>>>>>>> in production at LinkedIn? Does that mean that the latest
> >> version
> >>>> of
> >>>>>> the
> >>>>>>>> source code is hidden at LinkedIn and contributors would have
> >> to
> >>>>> throw
> >>>>>>>> patches over the wall and wait months to get the integrated
> >>>> product?
> >>>>>>>>
> >>>>>>>> What we ran at LinkedIn is the same version in open source and
> >>>> there
> >>>>> is
> >>>>>>> no
> >>>>>>> internal repository of Kafka at LinkedIn. We plan to maintain
> >> that
> >>> in
> >>>>> the
> >>>>>>> future.
> >>>>>>>
> >>>>>>>
> >>>>>>>> Thanks!
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>

Re: Kafka questions

Posted by Paul Sutter <ps...@quantbench.com>.
One more suggestion:

Even before you have replication, it seems that you could delay producer side acks until after the data is recorded to disk, and still pass the data forward to consumers immediately.



On Jul 19, 2011, at 12:23 PM, Jay Kreps <ja...@gmail.com> wrote:

> Agreed, no reason the policy to hand out messages should not be
> configurable. We were hoping to make the whole question irrelevant with the
> replication since then the producer can choose the replication level it
> wants and fsync durability should be less of a concern.
> 
> I agree with your comment that a good implementation of streaming with acks
> being potentially superior.
> 
> -Jay
> 
> On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <ps...@quantbench.com>wrote:
> 
>> Jay,
>> 
>> Ah - thanks for the clarification on the delay in the broker. It would be
>> nice to if that were a configuration option, so that the end user can
>> choose
>> only to forward messages that have been written to disk, or choose to have
>> the data forwarded immediately. When you implement replication data hitting
>> the disk will matter less.
>> 
>> On the delay in the producer, I think it could best be resolved through
>> measurement. In your paper you compare two different approaches, and I'm
>> proposing a third:
>> 
>> 1. Send and wait (single message, JMS style)
>> 2. Batch, send, and wait (Kafka today)
>> 3. Stream with ACKs
>> 
>> Removing any wait for a reply should increase throughput, not decrease it,
>> so you're likely trading latency against potential CPU efficiency. And the
>> CPU savings is a question best resolved by measurement.
>> 
>> I'd also encourage you to think about the WAN case. When you send-and-wait,
>> you need to send a buffer that is >> the bandwidth delay product to
>> approach
>> full line utilization, and the line will go idle for one RTT while you stop
>> to wait for a reply. The bandwidth*delay product can get large (10s of
>> megabytes), and end users will rarely understand the need to tune the batch
>> size to increase throughput. They'll just say it's slow over long
>> distances.
>> 
>> All that said - your use case doesn't require minimizing latency or WAN
>> use,
>> so I can really understand if this isn't a priority for you.
>> 
>> It's a well designed product that has had some real thought put into it.
>> It's a really promising system, thanks for taking the time to respond to my
>> comments.
>> 
>> Paul
>> 
>> On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <ja...@gmail.com> wrote:
>> 
>>> Ah, I think what you are describing in zeromq is essentially the
>> equivalent
>>> of group commit for the socket. Essentially you wait until the socket is
>> no
>>> longer writable and then begin to queue data. This is an interesting
>> idea.
>>> Of course it would only have a positive effect when you had already
>>> overflowed the socket buffer and were sending a very high throughput of
>>> small messages. That basically is a way to degrade an overloaded
>>> synchronous
>>> send into a batched send. This is not really the same as what we have
>> done,
>>> which is to allow the ability to trade off latency for throughput in a
>>> configurable manner. The reason the later is important is that we do not
>>> have a handful of producers sending at a rate that saturates the network
>>> I/O
>>> capacity of those servers (the case where the group commit would help)
>> but
>>> rather we have thousands of producers sending at a medium low volume, so
>> we
>>> would never hit that in our use case. The advantage of batching is fewer
>>> requests that hit the server, and larger packets. Where the group commit
>>> would help is for the synchronous producer benchmarks, where you could
>>> potentially get much better throughput. This is something we should
>>> consider
>>> adding.
>>> 
>>> To be clear, though, we have not added latency in our layer, just made a
>>> configurable way to trade-off latency for throughput. This is
>> unambiguously
>>> a good thing, I think.
>>> 
>>> With respect to mmap, i think you are misunderstanding where the latency
>>> comes from. We immediately write data to the filesystem with no delay
>>> whatsoever. This incurs the overhead of a system call, as you point out,
>>> which could be avoided by mmap, but that doesn't add much in the way of
>>> latency. The latency comes from the fact that we do not make the written
>>> data available to consumers until we fsync the file to "ensure" the
>>> durability of consumed messages. The frequency of the fsync is
>>> configurable,
>>> anything either immediate or with a time or # messages threshold. This
>>> again
>>> trades latency for throughput.
>>> 
>>> -Jay
>>> 
>>> On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <psutter@quantbench.com
>>>> wrote:
>>> 
>>>> *Producer latency* - I'm not familiar with zeromq internals but my
>>>> understanding is that they send the first message(s) immediately and as
>>> TCP
>>>> queues up the data, it will eventually block as the send buffer fills,
>>> and
>>>> during this time messages can queue up, and thte net-net is that on
>>> average
>>>> the number of system calls is << the number of messages. The key is
>>> having
>>>> a
>>>> separate thread for network operations with very efficient thread
>>>> coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is a
>>> blight
>>>> against humanity.
>>>> 
>>>> Having any sort of delay adds latency. If every developer thinks its OK
>>> to
>>>> add a little latency in his layer, pretty soon you end up with 10
>> second
>>>> end
>>>> to end latency.
>>>> 
>>>> Having an "accumulated message count" is also bad for WAN performance.
>> If
>>>> your "window size" is a set of delayed messages, the only way to deal
>>> with
>>>> a
>>>> large bandwidth*delay product is to delay a lot of messages, then send
>>>> them.
>>>> You can fit a lot of data into a fiber. Imagine a gigabit link with
>> 100ms
>>>> roundtrip time, you can store 100MB in the fiber. And you need a
>>> multiples
>>>> of that for buffering if you need to do a retransmit.
>>>> 
>>>> *Broker Latency *- With mmap the memcpy() of the message should make
>> the
>>>> data available to a thread even in another process, the pages that you
>>> have
>>>> mapped are also in the buffer cache and available to a sendfile() call.
>>> or
>>>> at least I think so. The flush to physical disk (or msync() in this
>> case)
>>>> would still be delayed but without impacting end to end latency.
>>>> 
>>>> That said, in benchmarks I have done the fastest IO with the lowest CPU
>>>> overhead is unbuffered (direct) IO (which is lower overhead than using
>>> the
>>>> buffer cache with or without memory mapping), but then you'd have to
>>> manage
>>>> your own buffer pool and run your broker in a single multithreaded
>>> process.
>>>> But thats getting more extreme. Just getting rid of this buffer write
>>> delay
>>>> by using memory mapping will remove a big chunk of latency.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <ja...@gmail.com>
>> wrote:
>>>> 
>>>>> Hi Paul,
>>>>> 
>>>>> We are definitely interested in lowering latency--lower is always
>>>>> better--but that was not a major concern for us so far (we were
>>> replacing
>>>> a
>>>>> system with 1 hour latency), so we haven't focused on it yet. As you
>>>>> describe latency in our setup at linkedin comes from batching on the
>>>>> frontend and batching on the kafka servers do to very lenient flush
>>>>> settings.
>>>>> 
>>>>> I am interested in your comments on zeromq. Do they actually have a
>>>> better
>>>>> approach for this problem even when using TCP? If so I would be
>>>> interested
>>>>> to understand. The way I see things this is about trading throughput
>>> and
>>>>> latency. On the producer side you have only a few options:
>> immediately
>>>>> write
>>>>> the data to the socket buffer for sending or wait and see if the
>>>>> application
>>>>> writes more data. The OS will do this for you unless you set
>>> TCP_NODELAY,
>>>>> but the OS is relatively inflexible, it doesn't understand your data
>> so
>>> I
>>>>> think it just waits 200ms or until the socket buffer is full.
>>>>> 
>>>>> The current approach in the async producer captures the same
>> tradeoff,
>>>> but
>>>>> a
>>>>> little more flexibly, it allows you to specify a max delay and max
>>>>> accumulated message count, data is written when either of those is
>> hit.
>>>>> 
>>>>> Is it possible to better capture this tradeoff? Basically I am not
>>> aware
>>>> of
>>>>> any other trick here if you are using TCP, so i would be interested
>> in
>>>> what
>>>>> zeromq does if they are doing this better.
>>>>> 
>>>>> We do indeed write each message set to the filesystem as it arrives
>> but
>>>> we
>>>>> distribute messages to consumers only after the write has been
>> flushed
>>> to
>>>>> disk, delaying (batching) that flush is the cause of the latency but
>>> also
>>>>> gives better use of IOPs by generating larger writes. Mmap would
>> remove
>>>> the
>>>>> system call (which would be good), but not the flush I think. As you
>>> say,
>>>>> adding replication allows giving stronger guarantees without actually
>>>>> caring
>>>>> about durability on a particular server which would make it possible
>> to
>>>>> distribute messages to consumers after ack from some number of other
>>>>> servers
>>>>> irrespective of flushing to disk.
>>>>> 
>>>>> -Jay
>>>>> 
>>>>> On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <psutter@quantbench.com
>>> 
>>>>> wrote:
>>>>> 
>>>>>> Jun
>>>>>> 
>>>>>> Thanks for your answers and the link to the paper - that helps a
>> lot,
>>>>>> especially the comment in the paper that 10 second end to end
>> latency
>>>> is
>>>>>> good enough for your intended use case.
>>>>>> 
>>>>>> We're looking for much lower latencies, and the basic design of
>> Kafka
>>>>> feels
>>>>>> like it should support latencies in milliseconds with a few
>> changes.
>>>>> We're
>>>>>> either going to build our own system, or help develop something
>> that
>>>>>> already
>>>>>> exists, so please take my comments in the constructive way they're
>>>>> intended
>>>>>> (I realize the changes I'm suggesting are outside your intended use
>>>> case,
>>>>>> but if you're interested we may be able to provide a very capable
>>>>> developer
>>>>>> to help with the work, assuming we choose kafka over the other
>>> zillion
>>>>>> streaming systems that are coming out of the woodwork).
>>>>>> 
>>>>>> a. *Producer "queue.time"* - In my question 4 below, I was
>> referring
>>> to
>>>>> the
>>>>>> producer queue time.  With a default value of 5 seconds, that
>>> accounts
>>>>> for
>>>>>> half your end to end latency. A system like zeromq is optimized to
>>>> write
>>>>>> data immediately without delay, but in such a way to minimizes the
>>>> number
>>>>>> of
>>>>>> system calls required during high throughput messages. Zeromq is no
>>>>>> nirvana,
>>>>>> but it has a number of nice properties.
>>>>>> 
>>>>>> b. *Broker "log.default.flush.interval.ms"* - The default value of
>> 3
>>>>>> seconds
>>>>>> appears to be another significant source of latency in the system,
>>>>> assuming
>>>>>> that clients are unable to access data until it has been flushed.
>>> Since
>>>>> you
>>>>>> have wisely chosen to take advantage of the buffer cache as part of
>>>> your
>>>>>> system design, it seems that you could remove this latency
>> completely
>>>> by
>>>>>> memory mapping the partitions and memcpying each message as it
>>> arrives.
>>>>>> With
>>>>>> the right IPC mechanism clients could have immediate access to new
>>>>>> messages.
>>>>>> 
>>>>>> c. *Batching, sync vs async, replication, and auditing*. Its
>>>>> understandable
>>>>>> that you've chosen a a forensic approach to producer reliability
>>> (after
>>>>> the
>>>>>> fact auditing), but when you implement replication it would be
>> really
>>>>> nice
>>>>>> to revise the producer protocol mechanisms. If you used a streaming
>>>>>> mechanism with producer offsets and ACKs, you could ensure reliable
>>>>>> delivery
>>>>>> of producer streams to multiple brokers without the need to choose
>> a
>>>>> "batch
>>>>>> size" or "queue.time". This could also give you active/active
>>> failover
>>>> of
>>>>>> brokers. This may also help in the WAN case (my question 3 below)
>>>> because
>>>>>> you will be able to adaptively stuff more and more data through the
>>>> fiber
>>>>>> for high bandwidth*delay links without having to choose a large
>>> "batch
>>>>>> size"
>>>>>> nor have the additional latency that entails. Oh, and it will help
>>> you
>>>>> deal
>>>>>> with CRC errors once you start checking for them.
>>>>>> 
>>>>>> c. *Performance measurements* - I'd like to make a suggestion for
>>> your
>>>>>> performance measurements. Your benchmarks measure throughput, but a
>>>>>> throughput number is meaningless without an associated "% cpu
>> time".
>>>>>> Ideally
>>>>>> all measurements achieve wire speed (100MB/sec) at 0% CPU (since,
>>> after
>>>>>> all,
>>>>>> this is plumbing and we assume the cores in the system should have
>>>>> capacity
>>>>>> set aside for useful work too). Obviously nobody ever achieves
>> this,
>>>> but
>>>>> by
>>>>>> measuring it one can raise the bar in terms of optimization.
>>>>>> 
>>>>>> Paul
>>>>>> 
>>>>>> ps. Just for background, I am the cofounder at Quantcast where we
>>>> process
>>>>>> 3.5PB of data per day. These questions are related to my new
>> startup
>>>>>> Quantbench which will deal with financial market data where you
>> dont
>>>> want
>>>>>> any latency at all. And WAN issues are a big deal too.
>> Incidentally,
>>> I
>>>>> was
>>>>>> also founder of Orbital Data which was a WAN optimization company
>> so
>>>> I've
>>>>>> done a lot of work with protocols over long distances.
>>>>>> 
>>>>>> On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com> wrote:
>>>>>> 
>>>>>>> Paul,
>>>>>>> 
>>>>>>> Excellent questions. See my answers below. Thanks,
>>>>>>> 
>>>>>>> On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <
>>> psutter@quantbench.com
>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Kafka looks like an exciting project, thanks for opening it up.
>>>>>>>> 
>>>>>>>> I have a few questions:
>>>>>>>> 
>>>>>>>> 1. Are checksums end to end (ie, created by the producer and
>>>> checked
>>>>> by
>>>>>>> the
>>>>>>>> consumer)? or are they only used to confirm buffercache
>> behavior
>>> on
>>>>>> disk
>>>>>>> as
>>>>>>>> mentioned in the documentation? Bit errors occur vastly more
>>> often
>>>>> than
>>>>>>>> most
>>>>>>>> people assume, often because of device driver bugs. TCP only
>>>> detects
>>>>> 1
>>>>>>>> error
>>>>>>>> in 65536, so errors can flow through (if you like I can send
>>> links
>>>> to
>>>>>>>> papers
>>>>>>>> describing the need for checksums everywhere).
>>>>>>>> 
>>>>>>> 
>>>>>>> Checksum is generated at the producer and propagated to the
>> broker
>>>> and
>>>>>>> eventually the consumer. Currently, we only validate the checksum
>>> at
>>>>> the
>>>>>>> broker. We could further validate it at the consumer in the
>> future.
>>>>>>> 
>>>>>>>> 
>>>>>>>> 2. The consumer has a pretty solid mechanism to ensure it hasnt
>>>>> missed
>>>>>>> any
>>>>>>>> messages (i like the design by the way), but how does the
>>> producer
>>>>> know
>>>>>>>> that
>>>>>>>> all of its messages have been stored? (no apparent message id
>> on
>>>> that
>>>>>>> side
>>>>>>>> since the message id isnt known until the message is written to
>>> the
>>>>>>> file).
>>>>>>>> I'm especially curious how failover/replication could be
>>>> implemented
>>>>>> and
>>>>>>>> I'm
>>>>>>>> thinking that acks on the publisher side may help)
>>>>>>>> 
>>>>>>> 
>>>>>>> The producer side auditing is not built-in. At LinkedIn, we do
>> that
>>>> by
>>>>>>> generating an auditing event periodically in the eventhandler of
>>> the
>>>>>> async
>>>>>>> producer. The auditing event contains the number of events
>> produced
>>>> in
>>>>> a
>>>>>>> configured window (e.g., 10 minutes) and are sent to a separate
>>>> topic.
>>>>>> The
>>>>>>> consumer can read the actual data and the auditing event and
>>> compare
>>>>> the
>>>>>>> counts. See our paper (
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
>>>>>>> )
>>>>>>> for some more details.
>>>>>>> 
>>>>>>> 
>>>>>>>> 
>>>>>>>> 3. Has the consumer's flow control been tested over high
>>>>>> bandwidth*delay
>>>>>>>> links? (what bandwidth can you get from a London consumer of an
>>> SF
>>>>>>>> cluster?)
>>>>>>>> 
>>>>>>>> Yes, we actually replicate kafka data across data centers,
>> using
>>> an
>>>>>>> embedded consumer in a broker. Again, there is a bit more info on
>>>> this
>>>>> in
>>>>>>> our paper.
>>>>>>> 
>>>>>>> 
>>>>>>>> 4. What kind of performance do you get if you set the
>> producer's
>>>>>> message
>>>>>>>> delay to zero? (ie, is there a separate system call for each
>>>> message?
>>>>>> or
>>>>>>> do
>>>>>>>> you manage to aggregate messages into a smaller number of
>> system
>>>>> calls
>>>>>>> even
>>>>>>>> with a delay of 0?)
>>>>>>>> 
>>>>>>>> I assume that you are referring to the flush interval. One can
>>>>>> configure
>>>>>>> to
>>>>>>> flush every message to disk. This will slow down the throughput
>>>>>>> significantly.
>>>>>>> 
>>>>>>> 
>>>>>>>> 5. Have you considered using a library like zeromq for the
>>>> messaging
>>>>>>> layer
>>>>>>>> instead of rolling your own? (zeromq will handle #4 cleanly at
>>>>> millions
>>>>>>> of
>>>>>>>> messages per second and has clients in 20 languages)
>>>>>>>> 
>>>>>>>> No. Our proprietary format allows us to support things like
>>>>> compression
>>>>>>> in
>>>>>>> the future. However, we can definitely look into the zeromq
>> format.
>>>> Is
>>>>>>> their
>>>>>>> messaging layer easily extractable?
>>>>>>> 
>>>>>>> 
>>>>>>>> 6. Do you have any plans to support intermediate processing
>>>> elements
>>>>>> the
>>>>>>>> way
>>>>>>>> Flume supports?
>>>>>>>> 
>>>>>>>> For now, we are just focusing on getting the raw messaging
>> layer
>>>>> solid.
>>>>>>> We
>>>>>>> have worked a bit on streaming processing and will look into that
>>>> again
>>>>>> in
>>>>>>> the future.
>>>>>>> 
>>>>>>> 
>>>>>>>> 7. The docs mention that new versions will only be released
>> after
>>>>> they
>>>>>>> are
>>>>>>>> in production at LinkedIn? Does that mean that the latest
>> version
>>>> of
>>>>>> the
>>>>>>>> source code is hidden at LinkedIn and contributors would have
>> to
>>>>> throw
>>>>>>>> patches over the wall and wait months to get the integrated
>>>> product?
>>>>>>>> 
>>>>>>>> What we ran at LinkedIn is the same version in open source and
>>>> there
>>>>> is
>>>>>>> no
>>>>>>> internal repository of Kafka at LinkedIn. We plan to maintain
>> that
>>> in
>>>>> the
>>>>>>> future.
>>>>>>> 
>>>>>>> 
>>>>>>>> Thanks!
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Re: Kafka questions

Posted by Jay Kreps <ja...@gmail.com>.
Agreed, no reason the policy to hand out messages should not be
configurable. We were hoping to make the whole question irrelevant with the
replication since then the producer can choose the replication level it
wants and fsync durability should be less of a concern.

I agree with your comment that a good implementation of streaming with acks
being potentially superior.

-Jay

On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <ps...@quantbench.com>wrote:

> Jay,
>
> Ah - thanks for the clarification on the delay in the broker. It would be
> nice to if that were a configuration option, so that the end user can
> choose
> only to forward messages that have been written to disk, or choose to have
> the data forwarded immediately. When you implement replication data hitting
> the disk will matter less.
>
> On the delay in the producer, I think it could best be resolved through
> measurement. In your paper you compare two different approaches, and I'm
> proposing a third:
>
> 1. Send and wait (single message, JMS style)
> 2. Batch, send, and wait (Kafka today)
> 3. Stream with ACKs
>
> Removing any wait for a reply should increase throughput, not decrease it,
> so you're likely trading latency against potential CPU efficiency. And the
> CPU savings is a question best resolved by measurement.
>
> I'd also encourage you to think about the WAN case. When you send-and-wait,
> you need to send a buffer that is >> the bandwidth delay product to
> approach
> full line utilization, and the line will go idle for one RTT while you stop
> to wait for a reply. The bandwidth*delay product can get large (10s of
> megabytes), and end users will rarely understand the need to tune the batch
> size to increase throughput. They'll just say it's slow over long
> distances.
>
> All that said - your use case doesn't require minimizing latency or WAN
> use,
> so I can really understand if this isn't a priority for you.
>
> It's a well designed product that has had some real thought put into it.
> It's a really promising system, thanks for taking the time to respond to my
> comments.
>
> Paul
>
> On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Ah, I think what you are describing in zeromq is essentially the
> equivalent
> > of group commit for the socket. Essentially you wait until the socket is
> no
> > longer writable and then begin to queue data. This is an interesting
> idea.
> > Of course it would only have a positive effect when you had already
> > overflowed the socket buffer and were sending a very high throughput of
> > small messages. That basically is a way to degrade an overloaded
> > synchronous
> > send into a batched send. This is not really the same as what we have
> done,
> > which is to allow the ability to trade off latency for throughput in a
> > configurable manner. The reason the later is important is that we do not
> > have a handful of producers sending at a rate that saturates the network
> > I/O
> > capacity of those servers (the case where the group commit would help)
> but
> > rather we have thousands of producers sending at a medium low volume, so
> we
> > would never hit that in our use case. The advantage of batching is fewer
> > requests that hit the server, and larger packets. Where the group commit
> > would help is for the synchronous producer benchmarks, where you could
> > potentially get much better throughput. This is something we should
> > consider
> > adding.
> >
> > To be clear, though, we have not added latency in our layer, just made a
> > configurable way to trade-off latency for throughput. This is
> unambiguously
> > a good thing, I think.
> >
> > With respect to mmap, i think you are misunderstanding where the latency
> > comes from. We immediately write data to the filesystem with no delay
> > whatsoever. This incurs the overhead of a system call, as you point out,
> > which could be avoided by mmap, but that doesn't add much in the way of
> > latency. The latency comes from the fact that we do not make the written
> > data available to consumers until we fsync the file to "ensure" the
> > durability of consumed messages. The frequency of the fsync is
> > configurable,
> > anything either immediate or with a time or # messages threshold. This
> > again
> > trades latency for throughput.
> >
> > -Jay
> >
> > On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <psutter@quantbench.com
> > >wrote:
> >
> > > *Producer latency* - I'm not familiar with zeromq internals but my
> > > understanding is that they send the first message(s) immediately and as
> > TCP
> > > queues up the data, it will eventually block as the send buffer fills,
> > and
> > > during this time messages can queue up, and thte net-net is that on
> > average
> > > the number of system calls is << the number of messages. The key is
> > having
> > > a
> > > separate thread for network operations with very efficient thread
> > > coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is a
> > blight
> > > against humanity.
> > >
> > > Having any sort of delay adds latency. If every developer thinks its OK
> > to
> > > add a little latency in his layer, pretty soon you end up with 10
> second
> > > end
> > > to end latency.
> > >
> > > Having an "accumulated message count" is also bad for WAN performance.
> If
> > > your "window size" is a set of delayed messages, the only way to deal
> > with
> > > a
> > > large bandwidth*delay product is to delay a lot of messages, then send
> > > them.
> > > You can fit a lot of data into a fiber. Imagine a gigabit link with
> 100ms
> > > roundtrip time, you can store 100MB in the fiber. And you need a
> > multiples
> > > of that for buffering if you need to do a retransmit.
> > >
> > > *Broker Latency *- With mmap the memcpy() of the message should make
> the
> > > data available to a thread even in another process, the pages that you
> > have
> > > mapped are also in the buffer cache and available to a sendfile() call.
> > or
> > > at least I think so. The flush to physical disk (or msync() in this
> case)
> > > would still be delayed but without impacting end to end latency.
> > >
> > > That said, in benchmarks I have done the fastest IO with the lowest CPU
> > > overhead is unbuffered (direct) IO (which is lower overhead than using
> > the
> > > buffer cache with or without memory mapping), but then you'd have to
> > manage
> > > your own buffer pool and run your broker in a single multithreaded
> > process.
> > > But thats getting more extreme. Just getting rid of this buffer write
> > delay
> > > by using memory mapping will remove a big chunk of latency.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >
> > > > Hi Paul,
> > > >
> > > > We are definitely interested in lowering latency--lower is always
> > > > better--but that was not a major concern for us so far (we were
> > replacing
> > > a
> > > > system with 1 hour latency), so we haven't focused on it yet. As you
> > > > describe latency in our setup at linkedin comes from batching on the
> > > > frontend and batching on the kafka servers do to very lenient flush
> > > > settings.
> > > >
> > > > I am interested in your comments on zeromq. Do they actually have a
> > > better
> > > > approach for this problem even when using TCP? If so I would be
> > > interested
> > > > to understand. The way I see things this is about trading throughput
> > and
> > > > latency. On the producer side you have only a few options:
> immediately
> > > > write
> > > > the data to the socket buffer for sending or wait and see if the
> > > > application
> > > > writes more data. The OS will do this for you unless you set
> > TCP_NODELAY,
> > > > but the OS is relatively inflexible, it doesn't understand your data
> so
> > I
> > > > think it just waits 200ms or until the socket buffer is full.
> > > >
> > > > The current approach in the async producer captures the same
> tradeoff,
> > > but
> > > > a
> > > > little more flexibly, it allows you to specify a max delay and max
> > > > accumulated message count, data is written when either of those is
> hit.
> > > >
> > > > Is it possible to better capture this tradeoff? Basically I am not
> > aware
> > > of
> > > > any other trick here if you are using TCP, so i would be interested
> in
> > > what
> > > > zeromq does if they are doing this better.
> > > >
> > > > We do indeed write each message set to the filesystem as it arrives
> but
> > > we
> > > > distribute messages to consumers only after the write has been
> flushed
> > to
> > > > disk, delaying (batching) that flush is the cause of the latency but
> > also
> > > > gives better use of IOPs by generating larger writes. Mmap would
> remove
> > > the
> > > > system call (which would be good), but not the flush I think. As you
> > say,
> > > > adding replication allows giving stronger guarantees without actually
> > > > caring
> > > > about durability on a particular server which would make it possible
> to
> > > > distribute messages to consumers after ack from some number of other
> > > > servers
> > > > irrespective of flushing to disk.
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <psutter@quantbench.com
> >
> > > > wrote:
> > > >
> > > > > Jun
> > > > >
> > > > > Thanks for your answers and the link to the paper - that helps a
> lot,
> > > > > especially the comment in the paper that 10 second end to end
> latency
> > > is
> > > > > good enough for your intended use case.
> > > > >
> > > > > We're looking for much lower latencies, and the basic design of
> Kafka
> > > > feels
> > > > > like it should support latencies in milliseconds with a few
> changes.
> > > > We're
> > > > > either going to build our own system, or help develop something
> that
> > > > > already
> > > > > exists, so please take my comments in the constructive way they're
> > > > intended
> > > > > (I realize the changes I'm suggesting are outside your intended use
> > > case,
> > > > > but if you're interested we may be able to provide a very capable
> > > > developer
> > > > > to help with the work, assuming we choose kafka over the other
> > zillion
> > > > > streaming systems that are coming out of the woodwork).
> > > > >
> > > > > a. *Producer "queue.time"* - In my question 4 below, I was
> referring
> > to
> > > > the
> > > > > producer queue time.  With a default value of 5 seconds, that
> > accounts
> > > > for
> > > > > half your end to end latency. A system like zeromq is optimized to
> > > write
> > > > > data immediately without delay, but in such a way to minimizes the
> > > number
> > > > > of
> > > > > system calls required during high throughput messages. Zeromq is no
> > > > > nirvana,
> > > > > but it has a number of nice properties.
> > > > >
> > > > > b. *Broker "log.default.flush.interval.ms"* - The default value of
> 3
> > > > > seconds
> > > > > appears to be another significant source of latency in the system,
> > > > assuming
> > > > > that clients are unable to access data until it has been flushed.
> > Since
> > > > you
> > > > > have wisely chosen to take advantage of the buffer cache as part of
> > > your
> > > > > system design, it seems that you could remove this latency
> completely
> > > by
> > > > > memory mapping the partitions and memcpying each message as it
> > arrives.
> > > > > With
> > > > > the right IPC mechanism clients could have immediate access to new
> > > > > messages.
> > > > >
> > > > > c. *Batching, sync vs async, replication, and auditing*. Its
> > > > understandable
> > > > > that you've chosen a a forensic approach to producer reliability
> > (after
> > > > the
> > > > > fact auditing), but when you implement replication it would be
> really
> > > > nice
> > > > > to revise the producer protocol mechanisms. If you used a streaming
> > > > > mechanism with producer offsets and ACKs, you could ensure reliable
> > > > > delivery
> > > > > of producer streams to multiple brokers without the need to choose
> a
> > > > "batch
> > > > > size" or "queue.time". This could also give you active/active
> > failover
> > > of
> > > > > brokers. This may also help in the WAN case (my question 3 below)
> > > because
> > > > > you will be able to adaptively stuff more and more data through the
> > > fiber
> > > > > for high bandwidth*delay links without having to choose a large
> > "batch
> > > > > size"
> > > > > nor have the additional latency that entails. Oh, and it will help
> > you
> > > > deal
> > > > > with CRC errors once you start checking for them.
> > > > >
> > > > > c. *Performance measurements* - I'd like to make a suggestion for
> > your
> > > > > performance measurements. Your benchmarks measure throughput, but a
> > > > > throughput number is meaningless without an associated "% cpu
> time".
> > > > > Ideally
> > > > > all measurements achieve wire speed (100MB/sec) at 0% CPU (since,
> > after
> > > > > all,
> > > > > this is plumbing and we assume the cores in the system should have
> > > > capacity
> > > > > set aside for useful work too). Obviously nobody ever achieves
> this,
> > > but
> > > > by
> > > > > measuring it one can raise the bar in terms of optimization.
> > > > >
> > > > > Paul
> > > > >
> > > > > ps. Just for background, I am the cofounder at Quantcast where we
> > > process
> > > > > 3.5PB of data per day. These questions are related to my new
> startup
> > > > > Quantbench which will deal with financial market data where you
> dont
> > > want
> > > > > any latency at all. And WAN issues are a big deal too.
> Incidentally,
> > I
> > > > was
> > > > > also founder of Orbital Data which was a WAN optimization company
> so
> > > I've
> > > > > done a lot of work with protocols over long distances.
> > > > >
> > > > > On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com> wrote:
> > > > >
> > > > > > Paul,
> > > > > >
> > > > > > Excellent questions. See my answers below. Thanks,
> > > > > >
> > > > > > On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <
> > psutter@quantbench.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Kafka looks like an exciting project, thanks for opening it up.
> > > > > > >
> > > > > > > I have a few questions:
> > > > > > >
> > > > > > > 1. Are checksums end to end (ie, created by the producer and
> > > checked
> > > > by
> > > > > > the
> > > > > > > consumer)? or are they only used to confirm buffercache
> behavior
> > on
> > > > > disk
> > > > > > as
> > > > > > > mentioned in the documentation? Bit errors occur vastly more
> > often
> > > > than
> > > > > > > most
> > > > > > > people assume, often because of device driver bugs. TCP only
> > > detects
> > > > 1
> > > > > > > error
> > > > > > > in 65536, so errors can flow through (if you like I can send
> > links
> > > to
> > > > > > > papers
> > > > > > > describing the need for checksums everywhere).
> > > > > > >
> > > > > >
> > > > > > Checksum is generated at the producer and propagated to the
> broker
> > > and
> > > > > > eventually the consumer. Currently, we only validate the checksum
> > at
> > > > the
> > > > > > broker. We could further validate it at the consumer in the
> future.
> > > > > >
> > > > > > >
> > > > > > > 2. The consumer has a pretty solid mechanism to ensure it hasnt
> > > > missed
> > > > > > any
> > > > > > > messages (i like the design by the way), but how does the
> > producer
> > > > know
> > > > > > > that
> > > > > > > all of its messages have been stored? (no apparent message id
> on
> > > that
> > > > > > side
> > > > > > > since the message id isnt known until the message is written to
> > the
> > > > > > file).
> > > > > > > I'm especially curious how failover/replication could be
> > > implemented
> > > > > and
> > > > > > > I'm
> > > > > > > thinking that acks on the publisher side may help)
> > > > > > >
> > > > > >
> > > > > > The producer side auditing is not built-in. At LinkedIn, we do
> that
> > > by
> > > > > > generating an auditing event periodically in the eventhandler of
> > the
> > > > > async
> > > > > > producer. The auditing event contains the number of events
> produced
> > > in
> > > > a
> > > > > > configured window (e.g., 10 minutes) and are sent to a separate
> > > topic.
> > > > > The
> > > > > > consumer can read the actual data and the auditing event and
> > compare
> > > > the
> > > > > > counts. See our paper (
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> > > > > > )
> > > > > > for some more details.
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > 3. Has the consumer's flow control been tested over high
> > > > > bandwidth*delay
> > > > > > > links? (what bandwidth can you get from a London consumer of an
> > SF
> > > > > > > cluster?)
> > > > > > >
> > > > > > > Yes, we actually replicate kafka data across data centers,
> using
> > an
> > > > > > embedded consumer in a broker. Again, there is a bit more info on
> > > this
> > > > in
> > > > > > our paper.
> > > > > >
> > > > > >
> > > > > > > 4. What kind of performance do you get if you set the
> producer's
> > > > > message
> > > > > > > delay to zero? (ie, is there a separate system call for each
> > > message?
> > > > > or
> > > > > > do
> > > > > > > you manage to aggregate messages into a smaller number of
> system
> > > > calls
> > > > > > even
> > > > > > > with a delay of 0?)
> > > > > > >
> > > > > > > I assume that you are referring to the flush interval. One can
> > > > > configure
> > > > > > to
> > > > > > flush every message to disk. This will slow down the throughput
> > > > > > significantly.
> > > > > >
> > > > > >
> > > > > > > 5. Have you considered using a library like zeromq for the
> > > messaging
> > > > > > layer
> > > > > > > instead of rolling your own? (zeromq will handle #4 cleanly at
> > > > millions
> > > > > > of
> > > > > > > messages per second and has clients in 20 languages)
> > > > > > >
> > > > > > > No. Our proprietary format allows us to support things like
> > > > compression
> > > > > > in
> > > > > > the future. However, we can definitely look into the zeromq
> format.
> > > Is
> > > > > > their
> > > > > > messaging layer easily extractable?
> > > > > >
> > > > > >
> > > > > > > 6. Do you have any plans to support intermediate processing
> > > elements
> > > > > the
> > > > > > > way
> > > > > > > Flume supports?
> > > > > > >
> > > > > > > For now, we are just focusing on getting the raw messaging
> layer
> > > > solid.
> > > > > > We
> > > > > > have worked a bit on streaming processing and will look into that
> > > again
> > > > > in
> > > > > > the future.
> > > > > >
> > > > > >
> > > > > > > 7. The docs mention that new versions will only be released
> after
> > > > they
> > > > > > are
> > > > > > > in production at LinkedIn? Does that mean that the latest
> version
> > > of
> > > > > the
> > > > > > > source code is hidden at LinkedIn and contributors would have
> to
> > > > throw
> > > > > > > patches over the wall and wait months to get the integrated
> > > product?
> > > > > > >
> > > > > > > What we ran at LinkedIn is the same version in open source and
> > > there
> > > > is
> > > > > > no
> > > > > > internal repository of Kafka at LinkedIn. We plan to maintain
> that
> > in
> > > > the
> > > > > > future.
> > > > > >
> > > > > >
> > > > > > > Thanks!
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka questions

Posted by Paul Sutter <ps...@quantbench.com>.
Jay,

Ah - thanks for the clarification on the delay in the broker. It would be
nice to if that were a configuration option, so that the end user can choose
only to forward messages that have been written to disk, or choose to have
the data forwarded immediately. When you implement replication data hitting
the disk will matter less.

On the delay in the producer, I think it could best be resolved through
measurement. In your paper you compare two different approaches, and I'm
proposing a third:

1. Send and wait (single message, JMS style)
2. Batch, send, and wait (Kafka today)
3. Stream with ACKs

Removing any wait for a reply should increase throughput, not decrease it,
so you're likely trading latency against potential CPU efficiency. And the
CPU savings is a question best resolved by measurement.

I'd also encourage you to think about the WAN case. When you send-and-wait,
you need to send a buffer that is >> the bandwidth delay product to approach
full line utilization, and the line will go idle for one RTT while you stop
to wait for a reply. The bandwidth*delay product can get large (10s of
megabytes), and end users will rarely understand the need to tune the batch
size to increase throughput. They'll just say it's slow over long distances.

All that said - your use case doesn't require minimizing latency or WAN use,
so I can really understand if this isn't a priority for you.

It's a well designed product that has had some real thought put into it.
It's a really promising system, thanks for taking the time to respond to my
comments.

Paul

On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <ja...@gmail.com> wrote:

> Ah, I think what you are describing in zeromq is essentially the equivalent
> of group commit for the socket. Essentially you wait until the socket is no
> longer writable and then begin to queue data. This is an interesting idea.
> Of course it would only have a positive effect when you had already
> overflowed the socket buffer and were sending a very high throughput of
> small messages. That basically is a way to degrade an overloaded
> synchronous
> send into a batched send. This is not really the same as what we have done,
> which is to allow the ability to trade off latency for throughput in a
> configurable manner. The reason the later is important is that we do not
> have a handful of producers sending at a rate that saturates the network
> I/O
> capacity of those servers (the case where the group commit would help) but
> rather we have thousands of producers sending at a medium low volume, so we
> would never hit that in our use case. The advantage of batching is fewer
> requests that hit the server, and larger packets. Where the group commit
> would help is for the synchronous producer benchmarks, where you could
> potentially get much better throughput. This is something we should
> consider
> adding.
>
> To be clear, though, we have not added latency in our layer, just made a
> configurable way to trade-off latency for throughput. This is unambiguously
> a good thing, I think.
>
> With respect to mmap, i think you are misunderstanding where the latency
> comes from. We immediately write data to the filesystem with no delay
> whatsoever. This incurs the overhead of a system call, as you point out,
> which could be avoided by mmap, but that doesn't add much in the way of
> latency. The latency comes from the fact that we do not make the written
> data available to consumers until we fsync the file to "ensure" the
> durability of consumed messages. The frequency of the fsync is
> configurable,
> anything either immediate or with a time or # messages threshold. This
> again
> trades latency for throughput.
>
> -Jay
>
> On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <psutter@quantbench.com
> >wrote:
>
> > *Producer latency* - I'm not familiar with zeromq internals but my
> > understanding is that they send the first message(s) immediately and as
> TCP
> > queues up the data, it will eventually block as the send buffer fills,
> and
> > during this time messages can queue up, and thte net-net is that on
> average
> > the number of system calls is << the number of messages. The key is
> having
> > a
> > separate thread for network operations with very efficient thread
> > coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is a
> blight
> > against humanity.
> >
> > Having any sort of delay adds latency. If every developer thinks its OK
> to
> > add a little latency in his layer, pretty soon you end up with 10 second
> > end
> > to end latency.
> >
> > Having an "accumulated message count" is also bad for WAN performance. If
> > your "window size" is a set of delayed messages, the only way to deal
> with
> > a
> > large bandwidth*delay product is to delay a lot of messages, then send
> > them.
> > You can fit a lot of data into a fiber. Imagine a gigabit link with 100ms
> > roundtrip time, you can store 100MB in the fiber. And you need a
> multiples
> > of that for buffering if you need to do a retransmit.
> >
> > *Broker Latency *- With mmap the memcpy() of the message should make the
> > data available to a thread even in another process, the pages that you
> have
> > mapped are also in the buffer cache and available to a sendfile() call.
> or
> > at least I think so. The flush to physical disk (or msync() in this case)
> > would still be delayed but without impacting end to end latency.
> >
> > That said, in benchmarks I have done the fastest IO with the lowest CPU
> > overhead is unbuffered (direct) IO (which is lower overhead than using
> the
> > buffer cache with or without memory mapping), but then you'd have to
> manage
> > your own buffer pool and run your broker in a single multithreaded
> process.
> > But thats getting more extreme. Just getting rid of this buffer write
> delay
> > by using memory mapping will remove a big chunk of latency.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Hi Paul,
> > >
> > > We are definitely interested in lowering latency--lower is always
> > > better--but that was not a major concern for us so far (we were
> replacing
> > a
> > > system with 1 hour latency), so we haven't focused on it yet. As you
> > > describe latency in our setup at linkedin comes from batching on the
> > > frontend and batching on the kafka servers do to very lenient flush
> > > settings.
> > >
> > > I am interested in your comments on zeromq. Do they actually have a
> > better
> > > approach for this problem even when using TCP? If so I would be
> > interested
> > > to understand. The way I see things this is about trading throughput
> and
> > > latency. On the producer side you have only a few options: immediately
> > > write
> > > the data to the socket buffer for sending or wait and see if the
> > > application
> > > writes more data. The OS will do this for you unless you set
> TCP_NODELAY,
> > > but the OS is relatively inflexible, it doesn't understand your data so
> I
> > > think it just waits 200ms or until the socket buffer is full.
> > >
> > > The current approach in the async producer captures the same tradeoff,
> > but
> > > a
> > > little more flexibly, it allows you to specify a max delay and max
> > > accumulated message count, data is written when either of those is hit.
> > >
> > > Is it possible to better capture this tradeoff? Basically I am not
> aware
> > of
> > > any other trick here if you are using TCP, so i would be interested in
> > what
> > > zeromq does if they are doing this better.
> > >
> > > We do indeed write each message set to the filesystem as it arrives but
> > we
> > > distribute messages to consumers only after the write has been flushed
> to
> > > disk, delaying (batching) that flush is the cause of the latency but
> also
> > > gives better use of IOPs by generating larger writes. Mmap would remove
> > the
> > > system call (which would be good), but not the flush I think. As you
> say,
> > > adding replication allows giving stronger guarantees without actually
> > > caring
> > > about durability on a particular server which would make it possible to
> > > distribute messages to consumers after ack from some number of other
> > > servers
> > > irrespective of flushing to disk.
> > >
> > > -Jay
> > >
> > > On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <ps...@quantbench.com>
> > > wrote:
> > >
> > > > Jun
> > > >
> > > > Thanks for your answers and the link to the paper - that helps a lot,
> > > > especially the comment in the paper that 10 second end to end latency
> > is
> > > > good enough for your intended use case.
> > > >
> > > > We're looking for much lower latencies, and the basic design of Kafka
> > > feels
> > > > like it should support latencies in milliseconds with a few changes.
> > > We're
> > > > either going to build our own system, or help develop something that
> > > > already
> > > > exists, so please take my comments in the constructive way they're
> > > intended
> > > > (I realize the changes I'm suggesting are outside your intended use
> > case,
> > > > but if you're interested we may be able to provide a very capable
> > > developer
> > > > to help with the work, assuming we choose kafka over the other
> zillion
> > > > streaming systems that are coming out of the woodwork).
> > > >
> > > > a. *Producer "queue.time"* - In my question 4 below, I was referring
> to
> > > the
> > > > producer queue time.  With a default value of 5 seconds, that
> accounts
> > > for
> > > > half your end to end latency. A system like zeromq is optimized to
> > write
> > > > data immediately without delay, but in such a way to minimizes the
> > number
> > > > of
> > > > system calls required during high throughput messages. Zeromq is no
> > > > nirvana,
> > > > but it has a number of nice properties.
> > > >
> > > > b. *Broker "log.default.flush.interval.ms"* - The default value of 3
> > > > seconds
> > > > appears to be another significant source of latency in the system,
> > > assuming
> > > > that clients are unable to access data until it has been flushed.
> Since
> > > you
> > > > have wisely chosen to take advantage of the buffer cache as part of
> > your
> > > > system design, it seems that you could remove this latency completely
> > by
> > > > memory mapping the partitions and memcpying each message as it
> arrives.
> > > > With
> > > > the right IPC mechanism clients could have immediate access to new
> > > > messages.
> > > >
> > > > c. *Batching, sync vs async, replication, and auditing*. Its
> > > understandable
> > > > that you've chosen a a forensic approach to producer reliability
> (after
> > > the
> > > > fact auditing), but when you implement replication it would be really
> > > nice
> > > > to revise the producer protocol mechanisms. If you used a streaming
> > > > mechanism with producer offsets and ACKs, you could ensure reliable
> > > > delivery
> > > > of producer streams to multiple brokers without the need to choose a
> > > "batch
> > > > size" or "queue.time". This could also give you active/active
> failover
> > of
> > > > brokers. This may also help in the WAN case (my question 3 below)
> > because
> > > > you will be able to adaptively stuff more and more data through the
> > fiber
> > > > for high bandwidth*delay links without having to choose a large
> "batch
> > > > size"
> > > > nor have the additional latency that entails. Oh, and it will help
> you
> > > deal
> > > > with CRC errors once you start checking for them.
> > > >
> > > > c. *Performance measurements* - I'd like to make a suggestion for
> your
> > > > performance measurements. Your benchmarks measure throughput, but a
> > > > throughput number is meaningless without an associated "% cpu time".
> > > > Ideally
> > > > all measurements achieve wire speed (100MB/sec) at 0% CPU (since,
> after
> > > > all,
> > > > this is plumbing and we assume the cores in the system should have
> > > capacity
> > > > set aside for useful work too). Obviously nobody ever achieves this,
> > but
> > > by
> > > > measuring it one can raise the bar in terms of optimization.
> > > >
> > > > Paul
> > > >
> > > > ps. Just for background, I am the cofounder at Quantcast where we
> > process
> > > > 3.5PB of data per day. These questions are related to my new startup
> > > > Quantbench which will deal with financial market data where you dont
> > want
> > > > any latency at all. And WAN issues are a big deal too. Incidentally,
> I
> > > was
> > > > also founder of Orbital Data which was a WAN optimization company so
> > I've
> > > > done a lot of work with protocols over long distances.
> > > >
> > > > On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > > > Paul,
> > > > >
> > > > > Excellent questions. See my answers below. Thanks,
> > > > >
> > > > > On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <
> psutter@quantbench.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Kafka looks like an exciting project, thanks for opening it up.
> > > > > >
> > > > > > I have a few questions:
> > > > > >
> > > > > > 1. Are checksums end to end (ie, created by the producer and
> > checked
> > > by
> > > > > the
> > > > > > consumer)? or are they only used to confirm buffercache behavior
> on
> > > > disk
> > > > > as
> > > > > > mentioned in the documentation? Bit errors occur vastly more
> often
> > > than
> > > > > > most
> > > > > > people assume, often because of device driver bugs. TCP only
> > detects
> > > 1
> > > > > > error
> > > > > > in 65536, so errors can flow through (if you like I can send
> links
> > to
> > > > > > papers
> > > > > > describing the need for checksums everywhere).
> > > > > >
> > > > >
> > > > > Checksum is generated at the producer and propagated to the broker
> > and
> > > > > eventually the consumer. Currently, we only validate the checksum
> at
> > > the
> > > > > broker. We could further validate it at the consumer in the future.
> > > > >
> > > > > >
> > > > > > 2. The consumer has a pretty solid mechanism to ensure it hasnt
> > > missed
> > > > > any
> > > > > > messages (i like the design by the way), but how does the
> producer
> > > know
> > > > > > that
> > > > > > all of its messages have been stored? (no apparent message id on
> > that
> > > > > side
> > > > > > since the message id isnt known until the message is written to
> the
> > > > > file).
> > > > > > I'm especially curious how failover/replication could be
> > implemented
> > > > and
> > > > > > I'm
> > > > > > thinking that acks on the publisher side may help)
> > > > > >
> > > > >
> > > > > The producer side auditing is not built-in. At LinkedIn, we do that
> > by
> > > > > generating an auditing event periodically in the eventhandler of
> the
> > > > async
> > > > > producer. The auditing event contains the number of events produced
> > in
> > > a
> > > > > configured window (e.g., 10 minutes) and are sent to a separate
> > topic.
> > > > The
> > > > > consumer can read the actual data and the auditing event and
> compare
> > > the
> > > > > counts. See our paper (
> > > > >
> > > > >
> > > >
> > >
> >
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> > > > > )
> > > > > for some more details.
> > > > >
> > > > >
> > > > > >
> > > > > > 3. Has the consumer's flow control been tested over high
> > > > bandwidth*delay
> > > > > > links? (what bandwidth can you get from a London consumer of an
> SF
> > > > > > cluster?)
> > > > > >
> > > > > > Yes, we actually replicate kafka data across data centers, using
> an
> > > > > embedded consumer in a broker. Again, there is a bit more info on
> > this
> > > in
> > > > > our paper.
> > > > >
> > > > >
> > > > > > 4. What kind of performance do you get if you set the producer's
> > > > message
> > > > > > delay to zero? (ie, is there a separate system call for each
> > message?
> > > > or
> > > > > do
> > > > > > you manage to aggregate messages into a smaller number of system
> > > calls
> > > > > even
> > > > > > with a delay of 0?)
> > > > > >
> > > > > > I assume that you are referring to the flush interval. One can
> > > > configure
> > > > > to
> > > > > flush every message to disk. This will slow down the throughput
> > > > > significantly.
> > > > >
> > > > >
> > > > > > 5. Have you considered using a library like zeromq for the
> > messaging
> > > > > layer
> > > > > > instead of rolling your own? (zeromq will handle #4 cleanly at
> > > millions
> > > > > of
> > > > > > messages per second and has clients in 20 languages)
> > > > > >
> > > > > > No. Our proprietary format allows us to support things like
> > > compression
> > > > > in
> > > > > the future. However, we can definitely look into the zeromq format.
> > Is
> > > > > their
> > > > > messaging layer easily extractable?
> > > > >
> > > > >
> > > > > > 6. Do you have any plans to support intermediate processing
> > elements
> > > > the
> > > > > > way
> > > > > > Flume supports?
> > > > > >
> > > > > > For now, we are just focusing on getting the raw messaging layer
> > > solid.
> > > > > We
> > > > > have worked a bit on streaming processing and will look into that
> > again
> > > > in
> > > > > the future.
> > > > >
> > > > >
> > > > > > 7. The docs mention that new versions will only be released after
> > > they
> > > > > are
> > > > > > in production at LinkedIn? Does that mean that the latest version
> > of
> > > > the
> > > > > > source code is hidden at LinkedIn and contributors would have to
> > > throw
> > > > > > patches over the wall and wait months to get the integrated
> > product?
> > > > > >
> > > > > > What we ran at LinkedIn is the same version in open source and
> > there
> > > is
> > > > > no
> > > > > internal repository of Kafka at LinkedIn. We plan to maintain that
> in
> > > the
> > > > > future.
> > > > >
> > > > >
> > > > > > Thanks!
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka questions

Posted by Jay Kreps <ja...@gmail.com>.
Ah, I think what you are describing in zeromq is essentially the equivalent
of group commit for the socket. Essentially you wait until the socket is no
longer writable and then begin to queue data. This is an interesting idea.
Of course it would only have a positive effect when you had already
overflowed the socket buffer and were sending a very high throughput of
small messages. That basically is a way to degrade an overloaded synchronous
send into a batched send. This is not really the same as what we have done,
which is to allow the ability to trade off latency for throughput in a
configurable manner. The reason the later is important is that we do not
have a handful of producers sending at a rate that saturates the network I/O
capacity of those servers (the case where the group commit would help) but
rather we have thousands of producers sending at a medium low volume, so we
would never hit that in our use case. The advantage of batching is fewer
requests that hit the server, and larger packets. Where the group commit
would help is for the synchronous producer benchmarks, where you could
potentially get much better throughput. This is something we should consider
adding.

To be clear, though, we have not added latency in our layer, just made a
configurable way to trade-off latency for throughput. This is unambiguously
a good thing, I think.

With respect to mmap, i think you are misunderstanding where the latency
comes from. We immediately write data to the filesystem with no delay
whatsoever. This incurs the overhead of a system call, as you point out,
which could be avoided by mmap, but that doesn't add much in the way of
latency. The latency comes from the fact that we do not make the written
data available to consumers until we fsync the file to "ensure" the
durability of consumed messages. The frequency of the fsync is configurable,
anything either immediate or with a time or # messages threshold. This again
trades latency for throughput.

-Jay

On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <ps...@quantbench.com>wrote:

> *Producer latency* - I'm not familiar with zeromq internals but my
> understanding is that they send the first message(s) immediately and as TCP
> queues up the data, it will eventually block as the send buffer fills, and
> during this time messages can queue up, and thte net-net is that on average
> the number of system calls is << the number of messages. The key is having
> a
> separate thread for network operations with very efficient thread
> coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is a blight
> against humanity.
>
> Having any sort of delay adds latency. If every developer thinks its OK to
> add a little latency in his layer, pretty soon you end up with 10 second
> end
> to end latency.
>
> Having an "accumulated message count" is also bad for WAN performance. If
> your "window size" is a set of delayed messages, the only way to deal with
> a
> large bandwidth*delay product is to delay a lot of messages, then send
> them.
> You can fit a lot of data into a fiber. Imagine a gigabit link with 100ms
> roundtrip time, you can store 100MB in the fiber. And you need a multiples
> of that for buffering if you need to do a retransmit.
>
> *Broker Latency *- With mmap the memcpy() of the message should make the
> data available to a thread even in another process, the pages that you have
> mapped are also in the buffer cache and available to a sendfile() call. or
> at least I think so. The flush to physical disk (or msync() in this case)
> would still be delayed but without impacting end to end latency.
>
> That said, in benchmarks I have done the fastest IO with the lowest CPU
> overhead is unbuffered (direct) IO (which is lower overhead than using the
> buffer cache with or without memory mapping), but then you'd have to manage
> your own buffer pool and run your broker in a single multithreaded process.
> But thats getting more extreme. Just getting rid of this buffer write delay
> by using memory mapping will remove a big chunk of latency.
>
>
>
>
>
>
>
>
>
> On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Hi Paul,
> >
> > We are definitely interested in lowering latency--lower is always
> > better--but that was not a major concern for us so far (we were replacing
> a
> > system with 1 hour latency), so we haven't focused on it yet. As you
> > describe latency in our setup at linkedin comes from batching on the
> > frontend and batching on the kafka servers do to very lenient flush
> > settings.
> >
> > I am interested in your comments on zeromq. Do they actually have a
> better
> > approach for this problem even when using TCP? If so I would be
> interested
> > to understand. The way I see things this is about trading throughput and
> > latency. On the producer side you have only a few options: immediately
> > write
> > the data to the socket buffer for sending or wait and see if the
> > application
> > writes more data. The OS will do this for you unless you set TCP_NODELAY,
> > but the OS is relatively inflexible, it doesn't understand your data so I
> > think it just waits 200ms or until the socket buffer is full.
> >
> > The current approach in the async producer captures the same tradeoff,
> but
> > a
> > little more flexibly, it allows you to specify a max delay and max
> > accumulated message count, data is written when either of those is hit.
> >
> > Is it possible to better capture this tradeoff? Basically I am not aware
> of
> > any other trick here if you are using TCP, so i would be interested in
> what
> > zeromq does if they are doing this better.
> >
> > We do indeed write each message set to the filesystem as it arrives but
> we
> > distribute messages to consumers only after the write has been flushed to
> > disk, delaying (batching) that flush is the cause of the latency but also
> > gives better use of IOPs by generating larger writes. Mmap would remove
> the
> > system call (which would be good), but not the flush I think. As you say,
> > adding replication allows giving stronger guarantees without actually
> > caring
> > about durability on a particular server which would make it possible to
> > distribute messages to consumers after ack from some number of other
> > servers
> > irrespective of flushing to disk.
> >
> > -Jay
> >
> > On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <ps...@quantbench.com>
> > wrote:
> >
> > > Jun
> > >
> > > Thanks for your answers and the link to the paper - that helps a lot,
> > > especially the comment in the paper that 10 second end to end latency
> is
> > > good enough for your intended use case.
> > >
> > > We're looking for much lower latencies, and the basic design of Kafka
> > feels
> > > like it should support latencies in milliseconds with a few changes.
> > We're
> > > either going to build our own system, or help develop something that
> > > already
> > > exists, so please take my comments in the constructive way they're
> > intended
> > > (I realize the changes I'm suggesting are outside your intended use
> case,
> > > but if you're interested we may be able to provide a very capable
> > developer
> > > to help with the work, assuming we choose kafka over the other zillion
> > > streaming systems that are coming out of the woodwork).
> > >
> > > a. *Producer "queue.time"* - In my question 4 below, I was referring to
> > the
> > > producer queue time.  With a default value of 5 seconds, that accounts
> > for
> > > half your end to end latency. A system like zeromq is optimized to
> write
> > > data immediately without delay, but in such a way to minimizes the
> number
> > > of
> > > system calls required during high throughput messages. Zeromq is no
> > > nirvana,
> > > but it has a number of nice properties.
> > >
> > > b. *Broker "log.default.flush.interval.ms"* - The default value of 3
> > > seconds
> > > appears to be another significant source of latency in the system,
> > assuming
> > > that clients are unable to access data until it has been flushed. Since
> > you
> > > have wisely chosen to take advantage of the buffer cache as part of
> your
> > > system design, it seems that you could remove this latency completely
> by
> > > memory mapping the partitions and memcpying each message as it arrives.
> > > With
> > > the right IPC mechanism clients could have immediate access to new
> > > messages.
> > >
> > > c. *Batching, sync vs async, replication, and auditing*. Its
> > understandable
> > > that you've chosen a a forensic approach to producer reliability (after
> > the
> > > fact auditing), but when you implement replication it would be really
> > nice
> > > to revise the producer protocol mechanisms. If you used a streaming
> > > mechanism with producer offsets and ACKs, you could ensure reliable
> > > delivery
> > > of producer streams to multiple brokers without the need to choose a
> > "batch
> > > size" or "queue.time". This could also give you active/active failover
> of
> > > brokers. This may also help in the WAN case (my question 3 below)
> because
> > > you will be able to adaptively stuff more and more data through the
> fiber
> > > for high bandwidth*delay links without having to choose a large "batch
> > > size"
> > > nor have the additional latency that entails. Oh, and it will help you
> > deal
> > > with CRC errors once you start checking for them.
> > >
> > > c. *Performance measurements* - I'd like to make a suggestion for your
> > > performance measurements. Your benchmarks measure throughput, but a
> > > throughput number is meaningless without an associated "% cpu time".
> > > Ideally
> > > all measurements achieve wire speed (100MB/sec) at 0% CPU (since, after
> > > all,
> > > this is plumbing and we assume the cores in the system should have
> > capacity
> > > set aside for useful work too). Obviously nobody ever achieves this,
> but
> > by
> > > measuring it one can raise the bar in terms of optimization.
> > >
> > > Paul
> > >
> > > ps. Just for background, I am the cofounder at Quantcast where we
> process
> > > 3.5PB of data per day. These questions are related to my new startup
> > > Quantbench which will deal with financial market data where you dont
> want
> > > any latency at all. And WAN issues are a big deal too. Incidentally, I
> > was
> > > also founder of Orbital Data which was a WAN optimization company so
> I've
> > > done a lot of work with protocols over long distances.
> > >
> > > On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > Paul,
> > > >
> > > > Excellent questions. See my answers below. Thanks,
> > > >
> > > > On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <psutter@quantbench.com
> >
> > > > wrote:
> > > >
> > > > > Kafka looks like an exciting project, thanks for opening it up.
> > > > >
> > > > > I have a few questions:
> > > > >
> > > > > 1. Are checksums end to end (ie, created by the producer and
> checked
> > by
> > > > the
> > > > > consumer)? or are they only used to confirm buffercache behavior on
> > > disk
> > > > as
> > > > > mentioned in the documentation? Bit errors occur vastly more often
> > than
> > > > > most
> > > > > people assume, often because of device driver bugs. TCP only
> detects
> > 1
> > > > > error
> > > > > in 65536, so errors can flow through (if you like I can send links
> to
> > > > > papers
> > > > > describing the need for checksums everywhere).
> > > > >
> > > >
> > > > Checksum is generated at the producer and propagated to the broker
> and
> > > > eventually the consumer. Currently, we only validate the checksum at
> > the
> > > > broker. We could further validate it at the consumer in the future.
> > > >
> > > > >
> > > > > 2. The consumer has a pretty solid mechanism to ensure it hasnt
> > missed
> > > > any
> > > > > messages (i like the design by the way), but how does the producer
> > know
> > > > > that
> > > > > all of its messages have been stored? (no apparent message id on
> that
> > > > side
> > > > > since the message id isnt known until the message is written to the
> > > > file).
> > > > > I'm especially curious how failover/replication could be
> implemented
> > > and
> > > > > I'm
> > > > > thinking that acks on the publisher side may help)
> > > > >
> > > >
> > > > The producer side auditing is not built-in. At LinkedIn, we do that
> by
> > > > generating an auditing event periodically in the eventhandler of the
> > > async
> > > > producer. The auditing event contains the number of events produced
> in
> > a
> > > > configured window (e.g., 10 minutes) and are sent to a separate
> topic.
> > > The
> > > > consumer can read the actual data and the auditing event and compare
> > the
> > > > counts. See our paper (
> > > >
> > > >
> > >
> >
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> > > > )
> > > > for some more details.
> > > >
> > > >
> > > > >
> > > > > 3. Has the consumer's flow control been tested over high
> > > bandwidth*delay
> > > > > links? (what bandwidth can you get from a London consumer of an SF
> > > > > cluster?)
> > > > >
> > > > > Yes, we actually replicate kafka data across data centers, using an
> > > > embedded consumer in a broker. Again, there is a bit more info on
> this
> > in
> > > > our paper.
> > > >
> > > >
> > > > > 4. What kind of performance do you get if you set the producer's
> > > message
> > > > > delay to zero? (ie, is there a separate system call for each
> message?
> > > or
> > > > do
> > > > > you manage to aggregate messages into a smaller number of system
> > calls
> > > > even
> > > > > with a delay of 0?)
> > > > >
> > > > > I assume that you are referring to the flush interval. One can
> > > configure
> > > > to
> > > > flush every message to disk. This will slow down the throughput
> > > > significantly.
> > > >
> > > >
> > > > > 5. Have you considered using a library like zeromq for the
> messaging
> > > > layer
> > > > > instead of rolling your own? (zeromq will handle #4 cleanly at
> > millions
> > > > of
> > > > > messages per second and has clients in 20 languages)
> > > > >
> > > > > No. Our proprietary format allows us to support things like
> > compression
> > > > in
> > > > the future. However, we can definitely look into the zeromq format.
> Is
> > > > their
> > > > messaging layer easily extractable?
> > > >
> > > >
> > > > > 6. Do you have any plans to support intermediate processing
> elements
> > > the
> > > > > way
> > > > > Flume supports?
> > > > >
> > > > > For now, we are just focusing on getting the raw messaging layer
> > solid.
> > > > We
> > > > have worked a bit on streaming processing and will look into that
> again
> > > in
> > > > the future.
> > > >
> > > >
> > > > > 7. The docs mention that new versions will only be released after
> > they
> > > > are
> > > > > in production at LinkedIn? Does that mean that the latest version
> of
> > > the
> > > > > source code is hidden at LinkedIn and contributors would have to
> > throw
> > > > > patches over the wall and wait months to get the integrated
> product?
> > > > >
> > > > > What we ran at LinkedIn is the same version in open source and
> there
> > is
> > > > no
> > > > internal repository of Kafka at LinkedIn. We plan to maintain that in
> > the
> > > > future.
> > > >
> > > >
> > > > > Thanks!
> > > > >
> > > >
> > >
> >
>

Re: Kafka questions

Posted by Paul Sutter <ps...@quantbench.com>.
*Producer latency* - I'm not familiar with zeromq internals but my
understanding is that they send the first message(s) immediately and as TCP
queues up the data, it will eventually block as the send buffer fills, and
during this time messages can queue up, and thte net-net is that on average
the number of system calls is << the number of messages. The key is having a
separate thread for network operations with very efficient thread
coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is a blight
against humanity.

Having any sort of delay adds latency. If every developer thinks its OK to
add a little latency in his layer, pretty soon you end up with 10 second end
to end latency.

Having an "accumulated message count" is also bad for WAN performance. If
your "window size" is a set of delayed messages, the only way to deal with a
large bandwidth*delay product is to delay a lot of messages, then send them.
You can fit a lot of data into a fiber. Imagine a gigabit link with 100ms
roundtrip time, you can store 100MB in the fiber. And you need a multiples
of that for buffering if you need to do a retransmit.

*Broker Latency *- With mmap the memcpy() of the message should make the
data available to a thread even in another process, the pages that you have
mapped are also in the buffer cache and available to a sendfile() call. or
at least I think so. The flush to physical disk (or msync() in this case)
would still be delayed but without impacting end to end latency.

That said, in benchmarks I have done the fastest IO with the lowest CPU
overhead is unbuffered (direct) IO (which is lower overhead than using the
buffer cache with or without memory mapping), but then you'd have to manage
your own buffer pool and run your broker in a single multithreaded process.
But thats getting more extreme. Just getting rid of this buffer write delay
by using memory mapping will remove a big chunk of latency.









On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <ja...@gmail.com> wrote:

> Hi Paul,
>
> We are definitely interested in lowering latency--lower is always
> better--but that was not a major concern for us so far (we were replacing a
> system with 1 hour latency), so we haven't focused on it yet. As you
> describe latency in our setup at linkedin comes from batching on the
> frontend and batching on the kafka servers do to very lenient flush
> settings.
>
> I am interested in your comments on zeromq. Do they actually have a better
> approach for this problem even when using TCP? If so I would be interested
> to understand. The way I see things this is about trading throughput and
> latency. On the producer side you have only a few options: immediately
> write
> the data to the socket buffer for sending or wait and see if the
> application
> writes more data. The OS will do this for you unless you set TCP_NODELAY,
> but the OS is relatively inflexible, it doesn't understand your data so I
> think it just waits 200ms or until the socket buffer is full.
>
> The current approach in the async producer captures the same tradeoff, but
> a
> little more flexibly, it allows you to specify a max delay and max
> accumulated message count, data is written when either of those is hit.
>
> Is it possible to better capture this tradeoff? Basically I am not aware of
> any other trick here if you are using TCP, so i would be interested in what
> zeromq does if they are doing this better.
>
> We do indeed write each message set to the filesystem as it arrives but we
> distribute messages to consumers only after the write has been flushed to
> disk, delaying (batching) that flush is the cause of the latency but also
> gives better use of IOPs by generating larger writes. Mmap would remove the
> system call (which would be good), but not the flush I think. As you say,
> adding replication allows giving stronger guarantees without actually
> caring
> about durability on a particular server which would make it possible to
> distribute messages to consumers after ack from some number of other
> servers
> irrespective of flushing to disk.
>
> -Jay
>
> On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <ps...@quantbench.com>
> wrote:
>
> > Jun
> >
> > Thanks for your answers and the link to the paper - that helps a lot,
> > especially the comment in the paper that 10 second end to end latency is
> > good enough for your intended use case.
> >
> > We're looking for much lower latencies, and the basic design of Kafka
> feels
> > like it should support latencies in milliseconds with a few changes.
> We're
> > either going to build our own system, or help develop something that
> > already
> > exists, so please take my comments in the constructive way they're
> intended
> > (I realize the changes I'm suggesting are outside your intended use case,
> > but if you're interested we may be able to provide a very capable
> developer
> > to help with the work, assuming we choose kafka over the other zillion
> > streaming systems that are coming out of the woodwork).
> >
> > a. *Producer "queue.time"* - In my question 4 below, I was referring to
> the
> > producer queue time.  With a default value of 5 seconds, that accounts
> for
> > half your end to end latency. A system like zeromq is optimized to write
> > data immediately without delay, but in such a way to minimizes the number
> > of
> > system calls required during high throughput messages. Zeromq is no
> > nirvana,
> > but it has a number of nice properties.
> >
> > b. *Broker "log.default.flush.interval.ms"* - The default value of 3
> > seconds
> > appears to be another significant source of latency in the system,
> assuming
> > that clients are unable to access data until it has been flushed. Since
> you
> > have wisely chosen to take advantage of the buffer cache as part of your
> > system design, it seems that you could remove this latency completely by
> > memory mapping the partitions and memcpying each message as it arrives.
> > With
> > the right IPC mechanism clients could have immediate access to new
> > messages.
> >
> > c. *Batching, sync vs async, replication, and auditing*. Its
> understandable
> > that you've chosen a a forensic approach to producer reliability (after
> the
> > fact auditing), but when you implement replication it would be really
> nice
> > to revise the producer protocol mechanisms. If you used a streaming
> > mechanism with producer offsets and ACKs, you could ensure reliable
> > delivery
> > of producer streams to multiple brokers without the need to choose a
> "batch
> > size" or "queue.time". This could also give you active/active failover of
> > brokers. This may also help in the WAN case (my question 3 below) because
> > you will be able to adaptively stuff more and more data through the fiber
> > for high bandwidth*delay links without having to choose a large "batch
> > size"
> > nor have the additional latency that entails. Oh, and it will help you
> deal
> > with CRC errors once you start checking for them.
> >
> > c. *Performance measurements* - I'd like to make a suggestion for your
> > performance measurements. Your benchmarks measure throughput, but a
> > throughput number is meaningless without an associated "% cpu time".
> > Ideally
> > all measurements achieve wire speed (100MB/sec) at 0% CPU (since, after
> > all,
> > this is plumbing and we assume the cores in the system should have
> capacity
> > set aside for useful work too). Obviously nobody ever achieves this, but
> by
> > measuring it one can raise the bar in terms of optimization.
> >
> > Paul
> >
> > ps. Just for background, I am the cofounder at Quantcast where we process
> > 3.5PB of data per day. These questions are related to my new startup
> > Quantbench which will deal with financial market data where you dont want
> > any latency at all. And WAN issues are a big deal too. Incidentally, I
> was
> > also founder of Orbital Data which was a WAN optimization company so I've
> > done a lot of work with protocols over long distances.
> >
> > On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Paul,
> > >
> > > Excellent questions. See my answers below. Thanks,
> > >
> > > On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <ps...@quantbench.com>
> > > wrote:
> > >
> > > > Kafka looks like an exciting project, thanks for opening it up.
> > > >
> > > > I have a few questions:
> > > >
> > > > 1. Are checksums end to end (ie, created by the producer and checked
> by
> > > the
> > > > consumer)? or are they only used to confirm buffercache behavior on
> > disk
> > > as
> > > > mentioned in the documentation? Bit errors occur vastly more often
> than
> > > > most
> > > > people assume, often because of device driver bugs. TCP only detects
> 1
> > > > error
> > > > in 65536, so errors can flow through (if you like I can send links to
> > > > papers
> > > > describing the need for checksums everywhere).
> > > >
> > >
> > > Checksum is generated at the producer and propagated to the broker and
> > > eventually the consumer. Currently, we only validate the checksum at
> the
> > > broker. We could further validate it at the consumer in the future.
> > >
> > > >
> > > > 2. The consumer has a pretty solid mechanism to ensure it hasnt
> missed
> > > any
> > > > messages (i like the design by the way), but how does the producer
> know
> > > > that
> > > > all of its messages have been stored? (no apparent message id on that
> > > side
> > > > since the message id isnt known until the message is written to the
> > > file).
> > > > I'm especially curious how failover/replication could be implemented
> > and
> > > > I'm
> > > > thinking that acks on the publisher side may help)
> > > >
> > >
> > > The producer side auditing is not built-in. At LinkedIn, we do that by
> > > generating an auditing event periodically in the eventhandler of the
> > async
> > > producer. The auditing event contains the number of events produced in
> a
> > > configured window (e.g., 10 minutes) and are sent to a separate topic.
> > The
> > > consumer can read the actual data and the auditing event and compare
> the
> > > counts. See our paper (
> > >
> > >
> >
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> > > )
> > > for some more details.
> > >
> > >
> > > >
> > > > 3. Has the consumer's flow control been tested over high
> > bandwidth*delay
> > > > links? (what bandwidth can you get from a London consumer of an SF
> > > > cluster?)
> > > >
> > > > Yes, we actually replicate kafka data across data centers, using an
> > > embedded consumer in a broker. Again, there is a bit more info on this
> in
> > > our paper.
> > >
> > >
> > > > 4. What kind of performance do you get if you set the producer's
> > message
> > > > delay to zero? (ie, is there a separate system call for each message?
> > or
> > > do
> > > > you manage to aggregate messages into a smaller number of system
> calls
> > > even
> > > > with a delay of 0?)
> > > >
> > > > I assume that you are referring to the flush interval. One can
> > configure
> > > to
> > > flush every message to disk. This will slow down the throughput
> > > significantly.
> > >
> > >
> > > > 5. Have you considered using a library like zeromq for the messaging
> > > layer
> > > > instead of rolling your own? (zeromq will handle #4 cleanly at
> millions
> > > of
> > > > messages per second and has clients in 20 languages)
> > > >
> > > > No. Our proprietary format allows us to support things like
> compression
> > > in
> > > the future. However, we can definitely look into the zeromq format. Is
> > > their
> > > messaging layer easily extractable?
> > >
> > >
> > > > 6. Do you have any plans to support intermediate processing elements
> > the
> > > > way
> > > > Flume supports?
> > > >
> > > > For now, we are just focusing on getting the raw messaging layer
> solid.
> > > We
> > > have worked a bit on streaming processing and will look into that again
> > in
> > > the future.
> > >
> > >
> > > > 7. The docs mention that new versions will only be released after
> they
> > > are
> > > > in production at LinkedIn? Does that mean that the latest version of
> > the
> > > > source code is hidden at LinkedIn and contributors would have to
> throw
> > > > patches over the wall and wait months to get the integrated product?
> > > >
> > > > What we ran at LinkedIn is the same version in open source and there
> is
> > > no
> > > internal repository of Kafka at LinkedIn. We plan to maintain that in
> the
> > > future.
> > >
> > >
> > > > Thanks!
> > > >
> > >
> >
>

Re: Kafka questions

Posted by Jay Kreps <ja...@gmail.com>.
Hi Paul,

We are definitely interested in lowering latency--lower is always
better--but that was not a major concern for us so far (we were replacing a
system with 1 hour latency), so we haven't focused on it yet. As you
describe latency in our setup at linkedin comes from batching on the
frontend and batching on the kafka servers do to very lenient flush
settings.

I am interested in your comments on zeromq. Do they actually have a better
approach for this problem even when using TCP? If so I would be interested
to understand. The way I see things this is about trading throughput and
latency. On the producer side you have only a few options: immediately write
the data to the socket buffer for sending or wait and see if the application
writes more data. The OS will do this for you unless you set TCP_NODELAY,
but the OS is relatively inflexible, it doesn't understand your data so I
think it just waits 200ms or until the socket buffer is full.

The current approach in the async producer captures the same tradeoff, but a
little more flexibly, it allows you to specify a max delay and max
accumulated message count, data is written when either of those is hit.

Is it possible to better capture this tradeoff? Basically I am not aware of
any other trick here if you are using TCP, so i would be interested in what
zeromq does if they are doing this better.

We do indeed write each message set to the filesystem as it arrives but we
distribute messages to consumers only after the write has been flushed to
disk, delaying (batching) that flush is the cause of the latency but also
gives better use of IOPs by generating larger writes. Mmap would remove the
system call (which would be good), but not the flush I think. As you say,
adding replication allows giving stronger guarantees without actually caring
about durability on a particular server which would make it possible to
distribute messages to consumers after ack from some number of other servers
irrespective of flushing to disk.

-Jay

On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <ps...@quantbench.com> wrote:

> Jun
>
> Thanks for your answers and the link to the paper - that helps a lot,
> especially the comment in the paper that 10 second end to end latency is
> good enough for your intended use case.
>
> We're looking for much lower latencies, and the basic design of Kafka feels
> like it should support latencies in milliseconds with a few changes. We're
> either going to build our own system, or help develop something that
> already
> exists, so please take my comments in the constructive way they're intended
> (I realize the changes I'm suggesting are outside your intended use case,
> but if you're interested we may be able to provide a very capable developer
> to help with the work, assuming we choose kafka over the other zillion
> streaming systems that are coming out of the woodwork).
>
> a. *Producer "queue.time"* - In my question 4 below, I was referring to the
> producer queue time.  With a default value of 5 seconds, that accounts for
> half your end to end latency. A system like zeromq is optimized to write
> data immediately without delay, but in such a way to minimizes the number
> of
> system calls required during high throughput messages. Zeromq is no
> nirvana,
> but it has a number of nice properties.
>
> b. *Broker "log.default.flush.interval.ms"* - The default value of 3
> seconds
> appears to be another significant source of latency in the system, assuming
> that clients are unable to access data until it has been flushed. Since you
> have wisely chosen to take advantage of the buffer cache as part of your
> system design, it seems that you could remove this latency completely by
> memory mapping the partitions and memcpying each message as it arrives.
> With
> the right IPC mechanism clients could have immediate access to new
> messages.
>
> c. *Batching, sync vs async, replication, and auditing*. Its understandable
> that you've chosen a a forensic approach to producer reliability (after the
> fact auditing), but when you implement replication it would be really nice
> to revise the producer protocol mechanisms. If you used a streaming
> mechanism with producer offsets and ACKs, you could ensure reliable
> delivery
> of producer streams to multiple brokers without the need to choose a "batch
> size" or "queue.time". This could also give you active/active failover of
> brokers. This may also help in the WAN case (my question 3 below) because
> you will be able to adaptively stuff more and more data through the fiber
> for high bandwidth*delay links without having to choose a large "batch
> size"
> nor have the additional latency that entails. Oh, and it will help you deal
> with CRC errors once you start checking for them.
>
> c. *Performance measurements* - I'd like to make a suggestion for your
> performance measurements. Your benchmarks measure throughput, but a
> throughput number is meaningless without an associated "% cpu time".
> Ideally
> all measurements achieve wire speed (100MB/sec) at 0% CPU (since, after
> all,
> this is plumbing and we assume the cores in the system should have capacity
> set aside for useful work too). Obviously nobody ever achieves this, but by
> measuring it one can raise the bar in terms of optimization.
>
> Paul
>
> ps. Just for background, I am the cofounder at Quantcast where we process
> 3.5PB of data per day. These questions are related to my new startup
> Quantbench which will deal with financial market data where you dont want
> any latency at all. And WAN issues are a big deal too. Incidentally, I was
> also founder of Orbital Data which was a WAN optimization company so I've
> done a lot of work with protocols over long distances.
>
> On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Paul,
> >
> > Excellent questions. See my answers below. Thanks,
> >
> > On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <ps...@quantbench.com>
> > wrote:
> >
> > > Kafka looks like an exciting project, thanks for opening it up.
> > >
> > > I have a few questions:
> > >
> > > 1. Are checksums end to end (ie, created by the producer and checked by
> > the
> > > consumer)? or are they only used to confirm buffercache behavior on
> disk
> > as
> > > mentioned in the documentation? Bit errors occur vastly more often than
> > > most
> > > people assume, often because of device driver bugs. TCP only detects 1
> > > error
> > > in 65536, so errors can flow through (if you like I can send links to
> > > papers
> > > describing the need for checksums everywhere).
> > >
> >
> > Checksum is generated at the producer and propagated to the broker and
> > eventually the consumer. Currently, we only validate the checksum at the
> > broker. We could further validate it at the consumer in the future.
> >
> > >
> > > 2. The consumer has a pretty solid mechanism to ensure it hasnt missed
> > any
> > > messages (i like the design by the way), but how does the producer know
> > > that
> > > all of its messages have been stored? (no apparent message id on that
> > side
> > > since the message id isnt known until the message is written to the
> > file).
> > > I'm especially curious how failover/replication could be implemented
> and
> > > I'm
> > > thinking that acks on the publisher side may help)
> > >
> >
> > The producer side auditing is not built-in. At LinkedIn, we do that by
> > generating an auditing event periodically in the eventhandler of the
> async
> > producer. The auditing event contains the number of events produced in a
> > configured window (e.g., 10 minutes) and are sent to a separate topic.
> The
> > consumer can read the actual data and the auditing event and compare the
> > counts. See our paper (
> >
> >
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> > )
> > for some more details.
> >
> >
> > >
> > > 3. Has the consumer's flow control been tested over high
> bandwidth*delay
> > > links? (what bandwidth can you get from a London consumer of an SF
> > > cluster?)
> > >
> > > Yes, we actually replicate kafka data across data centers, using an
> > embedded consumer in a broker. Again, there is a bit more info on this in
> > our paper.
> >
> >
> > > 4. What kind of performance do you get if you set the producer's
> message
> > > delay to zero? (ie, is there a separate system call for each message?
> or
> > do
> > > you manage to aggregate messages into a smaller number of system calls
> > even
> > > with a delay of 0?)
> > >
> > > I assume that you are referring to the flush interval. One can
> configure
> > to
> > flush every message to disk. This will slow down the throughput
> > significantly.
> >
> >
> > > 5. Have you considered using a library like zeromq for the messaging
> > layer
> > > instead of rolling your own? (zeromq will handle #4 cleanly at millions
> > of
> > > messages per second and has clients in 20 languages)
> > >
> > > No. Our proprietary format allows us to support things like compression
> > in
> > the future. However, we can definitely look into the zeromq format. Is
> > their
> > messaging layer easily extractable?
> >
> >
> > > 6. Do you have any plans to support intermediate processing elements
> the
> > > way
> > > Flume supports?
> > >
> > > For now, we are just focusing on getting the raw messaging layer solid.
> > We
> > have worked a bit on streaming processing and will look into that again
> in
> > the future.
> >
> >
> > > 7. The docs mention that new versions will only be released after they
> > are
> > > in production at LinkedIn? Does that mean that the latest version of
> the
> > > source code is hidden at LinkedIn and contributors would have to throw
> > > patches over the wall and wait months to get the integrated product?
> > >
> > > What we ran at LinkedIn is the same version in open source and there is
> > no
> > internal repository of Kafka at LinkedIn. We plan to maintain that in the
> > future.
> >
> >
> > > Thanks!
> > >
> >
>

Re: Kafka questions

Posted by Paul Sutter <ps...@quantbench.com>.
Jun

Thanks for your answers and the link to the paper - that helps a lot,
especially the comment in the paper that 10 second end to end latency is
good enough for your intended use case.

We're looking for much lower latencies, and the basic design of Kafka feels
like it should support latencies in milliseconds with a few changes. We're
either going to build our own system, or help develop something that already
exists, so please take my comments in the constructive way they're intended
(I realize the changes I'm suggesting are outside your intended use case,
but if you're interested we may be able to provide a very capable developer
to help with the work, assuming we choose kafka over the other zillion
streaming systems that are coming out of the woodwork).

a. *Producer "queue.time"* - In my question 4 below, I was referring to the
producer queue time.  With a default value of 5 seconds, that accounts for
half your end to end latency. A system like zeromq is optimized to write
data immediately without delay, but in such a way to minimizes the number of
system calls required during high throughput messages. Zeromq is no nirvana,
but it has a number of nice properties.

b. *Broker "log.default.flush.interval.ms"* - The default value of 3 seconds
appears to be another significant source of latency in the system, assuming
that clients are unable to access data until it has been flushed. Since you
have wisely chosen to take advantage of the buffer cache as part of your
system design, it seems that you could remove this latency completely by
memory mapping the partitions and memcpying each message as it arrives. With
the right IPC mechanism clients could have immediate access to new messages.

c. *Batching, sync vs async, replication, and auditing*. Its understandable
that you've chosen a a forensic approach to producer reliability (after the
fact auditing), but when you implement replication it would be really nice
to revise the producer protocol mechanisms. If you used a streaming
mechanism with producer offsets and ACKs, you could ensure reliable delivery
of producer streams to multiple brokers without the need to choose a "batch
size" or "queue.time". This could also give you active/active failover of
brokers. This may also help in the WAN case (my question 3 below) because
you will be able to adaptively stuff more and more data through the fiber
for high bandwidth*delay links without having to choose a large "batch size"
nor have the additional latency that entails. Oh, and it will help you deal
with CRC errors once you start checking for them.

c. *Performance measurements* - I'd like to make a suggestion for your
performance measurements. Your benchmarks measure throughput, but a
throughput number is meaningless without an associated "% cpu time". Ideally
all measurements achieve wire speed (100MB/sec) at 0% CPU (since, after all,
this is plumbing and we assume the cores in the system should have capacity
set aside for useful work too). Obviously nobody ever achieves this, but by
measuring it one can raise the bar in terms of optimization.

Paul

ps. Just for background, I am the cofounder at Quantcast where we process
3.5PB of data per day. These questions are related to my new startup
Quantbench which will deal with financial market data where you dont want
any latency at all. And WAN issues are a big deal too. Incidentally, I was
also founder of Orbital Data which was a WAN optimization company so I've
done a lot of work with protocols over long distances.

On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <ju...@gmail.com> wrote:

> Paul,
>
> Excellent questions. See my answers below. Thanks,
>
> On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <ps...@quantbench.com>
> wrote:
>
> > Kafka looks like an exciting project, thanks for opening it up.
> >
> > I have a few questions:
> >
> > 1. Are checksums end to end (ie, created by the producer and checked by
> the
> > consumer)? or are they only used to confirm buffercache behavior on disk
> as
> > mentioned in the documentation? Bit errors occur vastly more often than
> > most
> > people assume, often because of device driver bugs. TCP only detects 1
> > error
> > in 65536, so errors can flow through (if you like I can send links to
> > papers
> > describing the need for checksums everywhere).
> >
>
> Checksum is generated at the producer and propagated to the broker and
> eventually the consumer. Currently, we only validate the checksum at the
> broker. We could further validate it at the consumer in the future.
>
> >
> > 2. The consumer has a pretty solid mechanism to ensure it hasnt missed
> any
> > messages (i like the design by the way), but how does the producer know
> > that
> > all of its messages have been stored? (no apparent message id on that
> side
> > since the message id isnt known until the message is written to the
> file).
> > I'm especially curious how failover/replication could be implemented and
> > I'm
> > thinking that acks on the publisher side may help)
> >
>
> The producer side auditing is not built-in. At LinkedIn, we do that by
> generating an auditing event periodically in the eventhandler of the async
> producer. The auditing event contains the number of events produced in a
> configured window (e.g., 10 minutes) and are sent to a separate topic. The
> consumer can read the actual data and the auditing event and compare the
> counts. See our paper (
>
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> )
> for some more details.
>
>
> >
> > 3. Has the consumer's flow control been tested over high bandwidth*delay
> > links? (what bandwidth can you get from a London consumer of an SF
> > cluster?)
> >
> > Yes, we actually replicate kafka data across data centers, using an
> embedded consumer in a broker. Again, there is a bit more info on this in
> our paper.
>
>
> > 4. What kind of performance do you get if you set the producer's message
> > delay to zero? (ie, is there a separate system call for each message? or
> do
> > you manage to aggregate messages into a smaller number of system calls
> even
> > with a delay of 0?)
> >
> > I assume that you are referring to the flush interval. One can configure
> to
> flush every message to disk. This will slow down the throughput
> significantly.
>
>
> > 5. Have you considered using a library like zeromq for the messaging
> layer
> > instead of rolling your own? (zeromq will handle #4 cleanly at millions
> of
> > messages per second and has clients in 20 languages)
> >
> > No. Our proprietary format allows us to support things like compression
> in
> the future. However, we can definitely look into the zeromq format. Is
> their
> messaging layer easily extractable?
>
>
> > 6. Do you have any plans to support intermediate processing elements the
> > way
> > Flume supports?
> >
> > For now, we are just focusing on getting the raw messaging layer solid.
> We
> have worked a bit on streaming processing and will look into that again in
> the future.
>
>
> > 7. The docs mention that new versions will only be released after they
> are
> > in production at LinkedIn? Does that mean that the latest version of the
> > source code is hidden at LinkedIn and contributors would have to throw
> > patches over the wall and wait months to get the integrated product?
> >
> > What we ran at LinkedIn is the same version in open source and there is
> no
> internal repository of Kafka at LinkedIn. We plan to maintain that in the
> future.
>
>
> > Thanks!
> >
>

Re: Kafka questions

Posted by Jun Rao <ju...@gmail.com>.
Paul,

Excellent questions. See my answers below. Thanks,

On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <ps...@quantbench.com> wrote:

> Kafka looks like an exciting project, thanks for opening it up.
>
> I have a few questions:
>
> 1. Are checksums end to end (ie, created by the producer and checked by the
> consumer)? or are they only used to confirm buffercache behavior on disk as
> mentioned in the documentation? Bit errors occur vastly more often than
> most
> people assume, often because of device driver bugs. TCP only detects 1
> error
> in 65536, so errors can flow through (if you like I can send links to
> papers
> describing the need for checksums everywhere).
>

Checksum is generated at the producer and propagated to the broker and
eventually the consumer. Currently, we only validate the checksum at the
broker. We could further validate it at the consumer in the future.

>
> 2. The consumer has a pretty solid mechanism to ensure it hasnt missed any
> messages (i like the design by the way), but how does the producer know
> that
> all of its messages have been stored? (no apparent message id on that side
> since the message id isnt known until the message is written to the file).
> I'm especially curious how failover/replication could be implemented and
> I'm
> thinking that acks on the publisher side may help)
>

The producer side auditing is not built-in. At LinkedIn, we do that by
generating an auditing event periodically in the eventhandler of the async
producer. The auditing event contains the number of events produced in a
configured window (e.g., 10 minutes) and are sent to a separate topic. The
consumer can read the actual data and the auditing event and compare the
counts. See our paper (
http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf)
for some more details.


>
> 3. Has the consumer's flow control been tested over high bandwidth*delay
> links? (what bandwidth can you get from a London consumer of an SF
> cluster?)
>
> Yes, we actually replicate kafka data across data centers, using an
embedded consumer in a broker. Again, there is a bit more info on this in
our paper.


> 4. What kind of performance do you get if you set the producer's message
> delay to zero? (ie, is there a separate system call for each message? or do
> you manage to aggregate messages into a smaller number of system calls even
> with a delay of 0?)
>
> I assume that you are referring to the flush interval. One can configure to
flush every message to disk. This will slow down the throughput
significantly.


> 5. Have you considered using a library like zeromq for the messaging layer
> instead of rolling your own? (zeromq will handle #4 cleanly at millions of
> messages per second and has clients in 20 languages)
>
> No. Our proprietary format allows us to support things like compression in
the future. However, we can definitely look into the zeromq format. Is their
messaging layer easily extractable?


> 6. Do you have any plans to support intermediate processing elements the
> way
> Flume supports?
>
> For now, we are just focusing on getting the raw messaging layer solid. We
have worked a bit on streaming processing and will look into that again in
the future.


> 7. The docs mention that new versions will only be released after they are
> in production at LinkedIn? Does that mean that the latest version of the
> source code is hidden at LinkedIn and contributors would have to throw
> patches over the wall and wait months to get the integrated product?
>
> What we ran at LinkedIn is the same version in open source and there is no
internal repository of Kafka at LinkedIn. We plan to maintain that in the
future.


> Thanks!
>