You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jason Rosenberg <jb...@squareup.com> on 2012/10/11 00:57:42 UTC

implications of using large number of topics....

Hi,

I'm exploring using kafka for the first time.

I'm contemplating a system where we transmit metric data at regular
intervals to kafka.  One question I have is whether to generate simple
messages with very little meta data (just timestamp and value), and keeping
meta data like the name/host/app that generated metric out of the message,
and have that be embodied in the name of the topic itself instead.
 Alternatively, we could have a relatively small number of topics, which
contain messages which include source meta data along with the timestamp
and metric value in each message.

1. On one hand, we'd have a large number of topics (say several hundred
thousand topics) with small messages, generated at a steady rate (say one
every 10 seconds).

2. Alternatively, we could have just few topics, which receive several
hundred thousand messages every 10 seconds, which contain 2 or 3 times more
data per message.

I'm wondering if kafka has any performance characteristics that differ for
the 2 scenarios.

I like #1 because it simplifies targeted message consumption, and enables
more interesting use of TopicFilter'ing.  But I'm unsure whether there
might be performance concerns with kafka (does it have to do more work to
separately manage each topic?).  Is this a common use case, or not?

Thanks for any insight.

Jason

Re: implications of using large number of topics....

Posted by Taylor Gautier <tg...@gmail.com>.
I was going for a model where by any "object" could be a pub sub topic, so
imagine a user, a piece of data owned by a user and so on.  This makes
publishing changes and listening for them "trivial".  The intent was to
make a single pub/sub bus to rule them all - that extended out all the way
to the client on the browser/mobile device.  It's not feasible (nor even
possible) to send a stream of some set of messages that the client may be
interested in and do the final filtering on the client because that may
mean the client gets messages it should not see (for security or privacy
reasons) and it also is inefficient.  There are obviously a couple of ways
one can implement sending only the messages that the client is interested
in, I went with a model that the client explicitly asks for exactly what it
wants.

I decided that to "keep it simple" it would make sense to extend this model
all the way to Kafka.  It ended up having the implications I illustrate
below, but aside from those complications we didn't have to implement any
complex routing at any other layer so that was the tradeoff we made (that
countered against the extra development we had to do to that I already
described).  In the end, I think it was a good tradeoff, but obviously it
came with a cost.

As for flush and latency, Kafka does not make messages available to
consumers until they have been flushed to disk, so flushing to disk
directly affects the latency of point to point message delivery.  Since the
system we were building was intended for near real-time updates to end
users on the site, low latency (sub 200 ms) was important.

Btw, I want to stress that I went with our adoption of Kafka knowing full
well that it was not intended to be used as I used it.  I did it anyway
because I figured if it would work for that use case, then for sure it
would work for the more "trivial" one of transmitting 100's of thousands of
logs files per second.


On Thu, Oct 11, 2012 at 9:24 AM, Jason Rosenberg <jb...@squareup.com> wrote:

> Taylor,
>
> Thanks for the detailed response.  I'd be interested to know why you
> thought it important to be able to support a large number of topics (as I'm
> contemplating as well).  What was your value proposition for that (it seems
> like you've gone to great lengths to make it work).
>
> I'm curious, you mention in several places the concern about the threshold
> for flushing data to disk.  And it seems you are saying that flushing
> sooner than later is desirable.  Why is this?  I would have thought keeping
> things in memory longer would be more efficient, etc.
>
> Thanks,
>
> Jason
>
>
> On Wed, Oct 10, 2012 at 8:13 PM, Taylor Gautier <tg...@gmail.com>
> wrote:
>
> > We used pattern #1 at Tagged.  I wouldn't recommend it unless you're
> really
> > committed.  It took a lot of work to get it working right.
> >
> > a) Performance degraded non-linearly (read it fell off a cliff) when
> > brokers were managing more than about 20k topics.  This was on a Linux
> RHEL
> > 5.3 system with EXT3.  YMMV.
> >
> > b) Startup time is significantly longer for a broker that is restarted
> due
> > to communication with ZK to sync up on those topics.
> >
> > c) If topics are short lived, even if Kafka expires the data segments
> using
> > it's standard 0.7 cleaner, the directory name for the topic will still
> > exist on disk and the topic is still considered "active" (in memory) in
> > Kafka.  This causes problems - see a above (open file handles).
> >
> > d) Message latency is affected.  Kafka syncs messages to disk if x
> messages
> > have buffered in memory, or y seconds have elapsed (both configurable).
>  If
> > you have few topics and many messages (pattern #2), you will be hitting
> the
> > x limit quite often, and get good throughput.  However, if you have many
> > topics and few messages per topic (pattern #1), you will have to rely on
> > the y threshold to flush to disk, and setting this too low can impact
> > performance (throughput) in a significant way.  Jay already mentioned
> this
> > as random writes.
> >
> > We had to implement a number of solutions ourselves to resolve these
> > issues, namely:
> >
> > #1 No ZK.  This means that all of the automatic partitioning done by
> Kafka
> > is not available, so we had to roll our own (luckily Tagged is pretty
> used
> > to scaling systems so there was much in-house expertise).  The solution
> > here was to implement a R/W proxy layer of machines to intercept messages
> > and read/write to/from Kafka handling the sharding at the proxy layer.
> >  Because most of our messages were coming from PHP and we didn't want to
> > use TCP we needed a UDP/TCP bridge/proxy anyway so this wasn't a huge
> deal
> > (also, we wanted strict ordering of messages, so we needed a shard by
> topic
> > feature anyway (I believe this can be done in 0.7 but we started with
> 0.6)
> >
> > #2 Custom cleaner.  We implemented an extra cleanup task inside the Kafka
> > process that could completely remove a topic from memory and disk.  For
> > clients, this sometimes meant that a subscribed topic suddenly changed
> it's
> > physical offset from some offset X to 0, but that's ok, while technically
> > it probably would never happen theoretically clients should have to
> handle
> > this case anyway because the Kafka physical message space is limited to
> > 64-bits (again, unlikely to ever wrap in practice, but you never know).
> >  Anyway it's pretty easy to handle this just catch the "invalid offset"
> > error Kafka gives and start at 0.
> >
> > #3 Low threshold for flush.  This gave us good latency, but poor
> throughput
> > (relatively speaking).  We had more than enough throughput, but it was
> > nowhere near what Kafka can do when setup in pattern #1.
> >
> > Given that you want to manage "hundreds of thousands of topics" that may
> > mean that you would need 10's of Kafka brokers which could be another
> > source of problems - it's more cost, more management, and more sources of
> > failure.  SSD's may help solve this problem btw, but now you are talking
> > expensive machines rather than using just off the shelf cheapo servers
> with
> > standard SATA drives.
> >
> > On Wed, Oct 10, 2012 at 4:25 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Yes the footprint of a topic is one directory per partition (a topic
> can
> > > have many subpartitions per partitions). Each directory contains one or
> > > more files (depending on how much data you are retaining and the
> segment
> > > size, both configurable).
> > >
> > > In addition to having lots of open files, which certainly scales up to
> > the
> > > hundreds of thousands, this will also impact the I/O pattern. As the
> > number
> > > of files increases the data written to each file necessarily decreases.
> > > This likely means lots of random I/O. The OS can group together writes,
> > but
> > > if you only doing a single write per topic every now and then there
> will
> > be
> > > nothing to group and you will lots of small random I/O. This will
> > > definitely impact throughput. I don't know where the practical limits
> are
> > > we have tested up to ~500 topics and see reasonable performance. We
> have
> > > not done serious performance testing with tens of thousands of topics
> or
> > > more.
> > >
> > > In addition to the filesystem concerns there is metadata kept for each
> > > partition in zk, and I believe zk keeps this metadata in memory.
> > >
> > > -Jay
> > >
> > > On Wed, Oct 10, 2012 at 4:12 PM, Jason Rosenberg <jb...@squareup.com>
> > wrote:
> > >
> > > > Ok,
> > > >
> > > > Perhaps for the sake of argument, consider the question if we have
> > just 1
> > > > kafka broker.  It sounds like it will need to keep a file handle open
> > for
> > > > each topic?  Is that right?
> > > >
> > > > Jason
> > > >
> > > > On Wed, Oct 10, 2012 at 4:05 PM, Neha Narkhede <
> > neha.narkhede@gmail.com
> > > > >wrote:
> > > >
> > > > > Hi Jason,
> > > > >
> > > > > We use option #2 at LinkedIn for metrics and tracking data.
> > Supporting
> > > > > Option #1 in Kafka 0.7 has its challenges since every topic is
> stored
> > > > > on every broker, by design. Hence, the number of topics a cluster
> can
> > > > > support is limited by the IO and number of open file handles on
> each
> > > > > broker. After Kafka 0.8 is released, the distribution of topics to
> > > > > brokers is user defined and can scale out with the number of
> brokers.
> > > > > Having said that, some Kafka users have successfully deployed Kafka
> > > > > 0.7 clusters hosting very high number of topics. I hope they can
> > share
> > > > > their experiences here.
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > >
> > > > > On Wed, Oct 10, 2012 at 3:57 PM, Jason Rosenberg <jbr@squareup.com
> >
> > > > wrote:
> > > > > > Hi,
> > > > > >
> > > > > > I'm exploring using kafka for the first time.
> > > > > >
> > > > > > I'm contemplating a system where we transmit metric data at
> regular
> > > > > > intervals to kafka.  One question I have is whether to generate
> > > simple
> > > > > > messages with very little meta data (just timestamp and value),
> and
> > > > > keeping
> > > > > > meta data like the name/host/app that generated metric out of the
> > > > > message,
> > > > > > and have that be embodied in the name of the topic itself
> instead.
> > > > > >  Alternatively, we could have a relatively small number of
> topics,
> > > > which
> > > > > > contain messages which include source meta data along with the
> > > > timestamp
> > > > > > and metric value in each message.
> > > > > >
> > > > > > 1. On one hand, we'd have a large number of topics (say several
> > > hundred
> > > > > > thousand topics) with small messages, generated at a steady rate
> > (say
> > > > one
> > > > > > every 10 seconds).
> > > > > >
> > > > > > 2. Alternatively, we could have just few topics, which receive
> > > several
> > > > > > hundred thousand messages every 10 seconds, which contain 2 or 3
> > > times
> > > > > more
> > > > > > data per message.
> > > > > >
> > > > > > I'm wondering if kafka has any performance characteristics that
> > > differ
> > > > > for
> > > > > > the 2 scenarios.
> > > > > >
> > > > > > I like #1 because it simplifies targeted message consumption, and
> > > > enables
> > > > > > more interesting use of TopicFilter'ing.  But I'm unsure whether
> > > there
> > > > > > might be performance concerns with kafka (does it have to do more
> > > work
> > > > to
> > > > > > separately manage each topic?).  Is this a common use case, or
> not?
> > > > > >
> > > > > > Thanks for any insight.
> > > > > >
> > > > > > Jason
> > > > >
> > > >
> > >
> >
>

Re: implications of using large number of topics....

Posted by Jason Rosenberg <jb...@squareup.com>.
Taylor,

Thanks for the detailed response.  I'd be interested to know why you
thought it important to be able to support a large number of topics (as I'm
contemplating as well).  What was your value proposition for that (it seems
like you've gone to great lengths to make it work).

I'm curious, you mention in several places the concern about the threshold
for flushing data to disk.  And it seems you are saying that flushing
sooner than later is desirable.  Why is this?  I would have thought keeping
things in memory longer would be more efficient, etc.

Thanks,

Jason


On Wed, Oct 10, 2012 at 8:13 PM, Taylor Gautier <tg...@gmail.com> wrote:

> We used pattern #1 at Tagged.  I wouldn't recommend it unless you're really
> committed.  It took a lot of work to get it working right.
>
> a) Performance degraded non-linearly (read it fell off a cliff) when
> brokers were managing more than about 20k topics.  This was on a Linux RHEL
> 5.3 system with EXT3.  YMMV.
>
> b) Startup time is significantly longer for a broker that is restarted due
> to communication with ZK to sync up on those topics.
>
> c) If topics are short lived, even if Kafka expires the data segments using
> it's standard 0.7 cleaner, the directory name for the topic will still
> exist on disk and the topic is still considered "active" (in memory) in
> Kafka.  This causes problems - see a above (open file handles).
>
> d) Message latency is affected.  Kafka syncs messages to disk if x messages
> have buffered in memory, or y seconds have elapsed (both configurable).  If
> you have few topics and many messages (pattern #2), you will be hitting the
> x limit quite often, and get good throughput.  However, if you have many
> topics and few messages per topic (pattern #1), you will have to rely on
> the y threshold to flush to disk, and setting this too low can impact
> performance (throughput) in a significant way.  Jay already mentioned this
> as random writes.
>
> We had to implement a number of solutions ourselves to resolve these
> issues, namely:
>
> #1 No ZK.  This means that all of the automatic partitioning done by Kafka
> is not available, so we had to roll our own (luckily Tagged is pretty used
> to scaling systems so there was much in-house expertise).  The solution
> here was to implement a R/W proxy layer of machines to intercept messages
> and read/write to/from Kafka handling the sharding at the proxy layer.
>  Because most of our messages were coming from PHP and we didn't want to
> use TCP we needed a UDP/TCP bridge/proxy anyway so this wasn't a huge deal
> (also, we wanted strict ordering of messages, so we needed a shard by topic
> feature anyway (I believe this can be done in 0.7 but we started with 0.6)
>
> #2 Custom cleaner.  We implemented an extra cleanup task inside the Kafka
> process that could completely remove a topic from memory and disk.  For
> clients, this sometimes meant that a subscribed topic suddenly changed it's
> physical offset from some offset X to 0, but that's ok, while technically
> it probably would never happen theoretically clients should have to handle
> this case anyway because the Kafka physical message space is limited to
> 64-bits (again, unlikely to ever wrap in practice, but you never know).
>  Anyway it's pretty easy to handle this just catch the "invalid offset"
> error Kafka gives and start at 0.
>
> #3 Low threshold for flush.  This gave us good latency, but poor throughput
> (relatively speaking).  We had more than enough throughput, but it was
> nowhere near what Kafka can do when setup in pattern #1.
>
> Given that you want to manage "hundreds of thousands of topics" that may
> mean that you would need 10's of Kafka brokers which could be another
> source of problems - it's more cost, more management, and more sources of
> failure.  SSD's may help solve this problem btw, but now you are talking
> expensive machines rather than using just off the shelf cheapo servers with
> standard SATA drives.
>
> On Wed, Oct 10, 2012 at 4:25 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Yes the footprint of a topic is one directory per partition (a topic can
> > have many subpartitions per partitions). Each directory contains one or
> > more files (depending on how much data you are retaining and the segment
> > size, both configurable).
> >
> > In addition to having lots of open files, which certainly scales up to
> the
> > hundreds of thousands, this will also impact the I/O pattern. As the
> number
> > of files increases the data written to each file necessarily decreases.
> > This likely means lots of random I/O. The OS can group together writes,
> but
> > if you only doing a single write per topic every now and then there will
> be
> > nothing to group and you will lots of small random I/O. This will
> > definitely impact throughput. I don't know where the practical limits are
> > we have tested up to ~500 topics and see reasonable performance. We have
> > not done serious performance testing with tens of thousands of topics or
> > more.
> >
> > In addition to the filesystem concerns there is metadata kept for each
> > partition in zk, and I believe zk keeps this metadata in memory.
> >
> > -Jay
> >
> > On Wed, Oct 10, 2012 at 4:12 PM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >
> > > Ok,
> > >
> > > Perhaps for the sake of argument, consider the question if we have
> just 1
> > > kafka broker.  It sounds like it will need to keep a file handle open
> for
> > > each topic?  Is that right?
> > >
> > > Jason
> > >
> > > On Wed, Oct 10, 2012 at 4:05 PM, Neha Narkhede <
> neha.narkhede@gmail.com
> > > >wrote:
> > >
> > > > Hi Jason,
> > > >
> > > > We use option #2 at LinkedIn for metrics and tracking data.
> Supporting
> > > > Option #1 in Kafka 0.7 has its challenges since every topic is stored
> > > > on every broker, by design. Hence, the number of topics a cluster can
> > > > support is limited by the IO and number of open file handles on each
> > > > broker. After Kafka 0.8 is released, the distribution of topics to
> > > > brokers is user defined and can scale out with the number of brokers.
> > > > Having said that, some Kafka users have successfully deployed Kafka
> > > > 0.7 clusters hosting very high number of topics. I hope they can
> share
> > > > their experiences here.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > > On Wed, Oct 10, 2012 at 3:57 PM, Jason Rosenberg <jb...@squareup.com>
> > > wrote:
> > > > > Hi,
> > > > >
> > > > > I'm exploring using kafka for the first time.
> > > > >
> > > > > I'm contemplating a system where we transmit metric data at regular
> > > > > intervals to kafka.  One question I have is whether to generate
> > simple
> > > > > messages with very little meta data (just timestamp and value), and
> > > > keeping
> > > > > meta data like the name/host/app that generated metric out of the
> > > > message,
> > > > > and have that be embodied in the name of the topic itself instead.
> > > > >  Alternatively, we could have a relatively small number of topics,
> > > which
> > > > > contain messages which include source meta data along with the
> > > timestamp
> > > > > and metric value in each message.
> > > > >
> > > > > 1. On one hand, we'd have a large number of topics (say several
> > hundred
> > > > > thousand topics) with small messages, generated at a steady rate
> (say
> > > one
> > > > > every 10 seconds).
> > > > >
> > > > > 2. Alternatively, we could have just few topics, which receive
> > several
> > > > > hundred thousand messages every 10 seconds, which contain 2 or 3
> > times
> > > > more
> > > > > data per message.
> > > > >
> > > > > I'm wondering if kafka has any performance characteristics that
> > differ
> > > > for
> > > > > the 2 scenarios.
> > > > >
> > > > > I like #1 because it simplifies targeted message consumption, and
> > > enables
> > > > > more interesting use of TopicFilter'ing.  But I'm unsure whether
> > there
> > > > > might be performance concerns with kafka (does it have to do more
> > work
> > > to
> > > > > separately manage each topic?).  Is this a common use case, or not?
> > > > >
> > > > > Thanks for any insight.
> > > > >
> > > > > Jason
> > > >
> > >
> >
>

Re: implications of using large number of topics....

Posted by Jun Rao <ju...@gmail.com>.
The replication code is in the 0.8 branch. The exact release date is not
planned yet, likely towards the end of the year. However, we hope that we
can stabilize it in the next 2-3 weeks so that people can start trying it.

Thanks,

Jun

On Sat, Oct 13, 2012 at 11:42 PM, Jason Rosenberg <jb...@squareup.com> wrote:

> Cool,
>
> What's the schedule for 0.8 coming out?  Are there any pre-release
> versions?
>
> Jason
>
> On Sat, Oct 13, 2012 at 8:37 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Jason,
> >
> > The issue with 0.7 is that a topic exists on every broker and every time
> > one adds a new broker, some additional partitions for each existing topic
> > are added to the new broker. This is going to change in 0.8. A topic has
> a
> > fixed number of partitions, independent of the # of brokers. So, by
> adding
> > more brokers, we can support more topics in a cluster.
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Oct 12, 2012 at 10:55 AM, Jason Rosenberg <jb...@squareup.com>
> > wrote:
> >
> > > Has there ever been a thought to better handle a large number of
> topics?
> > >  Prior discussions?  Or would it likely be too great of a change to the
> > way
> > > kafka works, no matter what?
> > >
> > > I'm wondering if there's a way to have a notion of multiple "virtual"
> > > topics which are internally managed as members of a single topic
> "group",
> > > but which at the api level, appear to be unique topics, from the client
> > > perspective.
> > >
> > > Naturally, it would be straightforward to implement something like this
> > by
> > > wrapping the current client apis, but I'm wondering if there's any
> > benefit
> > > to building it into the internals.  This would still have the downside
> > that
> > > a client subscribing to a virtual topic would have to, under the
> covers,
> > > sift through lots of messages it's not interested in.
> > >
> > > Any other interesting approaches?
> > >
> > > Jason
> > >
> > >
> > > On Thu, Oct 11, 2012 at 10:48 PM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > Mathias,
> > > >
> > > > What matters is the total # partitions since each corresponds to a
> > > separate
> > > > directory on disk. It doesn't matter how may topics those partitions
> > are
> > > > from.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Oct 11, 2012 at 6:43 AM, Mathias Söderberg <
> > > > mathias.soederberg@gmail.com> wrote:
> > > >
> > > > > Hey all,
> > > > >
> > > > > This is a quite interesting topic (no pun intended), and I've seen
> it
> > > > come
> > > > > up at least once before.
> > > > >
> > > > > Me and a friend started experimenting with Kafka and ZooKeeper a
> > little
> > > > > while ago (building a publisher / subscriber system with consistent
> > > > hashing
> > > > > and whatnot) and currently we're using around 300 topics, all with
> > one
> > > > > partition each. So far we haven't really done any serious
> performance
> > > > > testing, but I'm planning to do so in the following weeks. But I've
> > > got a
> > > > > few questions regardless:
> > > > >
> > > > >
> > > > > Does / should it make any difference in performance when one has a
> > lot
> > > of
> > > > > topics compared to having one topic with a lot of partitions? I'm
> > > > imagining
> > > > > that the system still needs to keep the same number of file
> > descriptors
> > > > > open, but I'm not sure how this would affect reads and writes? Are
> we
> > > > going
> > > > > to run into more random reads and writes by using a lot of topics
> > > > compared
> > > > > to using one topic with a lot of partitions instead? Can't really
> > wrap
> > > my
> > > > > head around this right now, mostly because of my rather limited
> > > knowledge
> > > > > about how disks and page caches work.
> > > > >
> > > > > Could also add that we're mostly doing sequential reads (in rare
> > cases
> > > we
> > > > > have to rewind a topic) and that the number of topics doesn't
> change.
> > > > >
> > > > > On 11 October 2012 05:13, Taylor Gautier <tg...@gmail.com>
> wrote:
> > > > >
> > > > > > We used pattern #1 at Tagged.  I wouldn't recommend it unless
> > you're
> > > > > really
> > > > > > committed.  It took a lot of work to get it working right.
> > > > > >
> > > > > > a) Performance degraded non-linearly (read it fell off a cliff)
> > when
> > > > > > brokers were managing more than about 20k topics.  This was on a
> > > Linux
> > > > > RHEL
> > > > > > 5.3 system with EXT3.  YMMV.
> > > > > >
> > > > > > b) Startup time is significantly longer for a broker that is
> > > restarted
> > > > > due
> > > > > > to communication with ZK to sync up on those topics.
> > > > > >
> > > > > > c) If topics are short lived, even if Kafka expires the data
> > segments
> > > > > using
> > > > > > it's standard 0.7 cleaner, the directory name for the topic will
> > > still
> > > > > > exist on disk and the topic is still considered "active" (in
> > memory)
> > > in
> > > > > > Kafka.  This causes problems - see a above (open file handles).
> > > > > >
> > > > > > d) Message latency is affected.  Kafka syncs messages to disk if
> x
> > > > > messages
> > > > > > have buffered in memory, or y seconds have elapsed (both
> > > configurable).
> > > > >  If
> > > > > > you have few topics and many messages (pattern #2), you will be
> > > hitting
> > > > > the
> > > > > > x limit quite often, and get good throughput.  However, if you
> have
> > > > many
> > > > > > topics and few messages per topic (pattern #1), you will have to
> > rely
> > > > on
> > > > > > the y threshold to flush to disk, and setting this too low can
> > impact
> > > > > > performance (throughput) in a significant way.  Jay already
> > mentioned
> > > > > this
> > > > > > as random writes.
> > > > > >
> > > > > > We had to implement a number of solutions ourselves to resolve
> > these
> > > > > > issues, namely:
> > > > > >
> > > > > > #1 No ZK.  This means that all of the automatic partitioning done
> > by
> > > > > Kafka
> > > > > > is not available, so we had to roll our own (luckily Tagged is
> > pretty
> > > > > used
> > > > > > to scaling systems so there was much in-house expertise).  The
> > > solution
> > > > > > here was to implement a R/W proxy layer of machines to intercept
> > > > messages
> > > > > > and read/write to/from Kafka handling the sharding at the proxy
> > > layer.
> > > > > >  Because most of our messages were coming from PHP and we didn't
> > want
> > > > to
> > > > > > use TCP we needed a UDP/TCP bridge/proxy anyway so this wasn't a
> > huge
> > > > > deal
> > > > > > (also, we wanted strict ordering of messages, so we needed a
> shard
> > by
> > > > > topic
> > > > > > feature anyway (I believe this can be done in 0.7 but we started
> > with
> > > > > 0.6)
> > > > > >
> > > > > > #2 Custom cleaner.  We implemented an extra cleanup task inside
> the
> > > > Kafka
> > > > > > process that could completely remove a topic from memory and
> disk.
> > >  For
> > > > > > clients, this sometimes meant that a subscribed topic suddenly
> > > changed
> > > > > it's
> > > > > > physical offset from some offset X to 0, but that's ok, while
> > > > technically
> > > > > > it probably would never happen theoretically clients should have
> to
> > > > > handle
> > > > > > this case anyway because the Kafka physical message space is
> > limited
> > > to
> > > > > > 64-bits (again, unlikely to ever wrap in practice, but you never
> > > know).
> > > > > >  Anyway it's pretty easy to handle this just catch the "invalid
> > > offset"
> > > > > > error Kafka gives and start at 0.
> > > > > >
> > > > > > #3 Low threshold for flush.  This gave us good latency, but poor
> > > > > throughput
> > > > > > (relatively speaking).  We had more than enough throughput, but
> it
> > > was
> > > > > > nowhere near what Kafka can do when setup in pattern #1.
> > > > > >
> > > > > > Given that you want to manage "hundreds of thousands of topics"
> > that
> > > > may
> > > > > > mean that you would need 10's of Kafka brokers which could be
> > another
> > > > > > source of problems - it's more cost, more management, and more
> > > sources
> > > > of
> > > > > > failure.  SSD's may help solve this problem btw, but now you are
> > > > talking
> > > > > > expensive machines rather than using just off the shelf cheapo
> > > servers
> > > > > with
> > > > > > standard SATA drives.
> > > > > >
> > > > > > On Wed, Oct 10, 2012 at 4:25 PM, Jay Kreps <ja...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Yes the footprint of a topic is one directory per partition (a
> > > topic
> > > > > can
> > > > > > > have many subpartitions per partitions). Each directory
> contains
> > > one
> > > > or
> > > > > > > more files (depending on how much data you are retaining and
> the
> > > > > segment
> > > > > > > size, both configurable).
> > > > > > >
> > > > > > > In addition to having lots of open files, which certainly
> scales
> > up
> > > > to
> > > > > > the
> > > > > > > hundreds of thousands, this will also impact the I/O pattern.
> As
> > > the
> > > > > > number
> > > > > > > of files increases the data written to each file necessarily
> > > > decreases.
> > > > > > > This likely means lots of random I/O. The OS can group together
> > > > writes,
> > > > > > but
> > > > > > > if you only doing a single write per topic every now and then
> > there
> > > > > will
> > > > > > be
> > > > > > > nothing to group and you will lots of small random I/O. This
> will
> > > > > > > definitely impact throughput. I don't know where the practical
> > > limits
> > > > > are
> > > > > > > we have tested up to ~500 topics and see reasonable
> performance.
> > We
> > > > > have
> > > > > > > not done serious performance testing with tens of thousands of
> > > topics
> > > > > or
> > > > > > > more.
> > > > > > >
> > > > > > > In addition to the filesystem concerns there is metadata kept
> for
> > > > each
> > > > > > > partition in zk, and I believe zk keeps this metadata in
> memory.
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > > On Wed, Oct 10, 2012 at 4:12 PM, Jason Rosenberg <
> > jbr@squareup.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Ok,
> > > > > > > >
> > > > > > > > Perhaps for the sake of argument, consider the question if we
> > > have
> > > > > > just 1
> > > > > > > > kafka broker.  It sounds like it will need to keep a file
> > handle
> > > > open
> > > > > > for
> > > > > > > > each topic?  Is that right?
> > > > > > > >
> > > > > > > > Jason
> > > > > > > >
> > > > > > > > On Wed, Oct 10, 2012 at 4:05 PM, Neha Narkhede <
> > > > > > neha.narkhede@gmail.com
> > > > > > > > >wrote:
> > > > > > > >
> > > > > > > > > Hi Jason,
> > > > > > > > >
> > > > > > > > > We use option #2 at LinkedIn for metrics and tracking data.
> > > > > > Supporting
> > > > > > > > > Option #1 in Kafka 0.7 has its challenges since every topic
> > is
> > > > > stored
> > > > > > > > > on every broker, by design. Hence, the number of topics a
> > > cluster
> > > > > can
> > > > > > > > > support is limited by the IO and number of open file
> handles
> > on
> > > > > each
> > > > > > > > > broker. After Kafka 0.8 is released, the distribution of
> > topics
> > > > to
> > > > > > > > > brokers is user defined and can scale out with the number
> of
> > > > > brokers.
> > > > > > > > > Having said that, some Kafka users have successfully
> deployed
> > > > Kafka
> > > > > > > > > 0.7 clusters hosting very high number of topics. I hope
> they
> > > can
> > > > > > share
> > > > > > > > > their experiences here.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Neha
> > > > > > > > >
> > > > > > > > > On Wed, Oct 10, 2012 at 3:57 PM, Jason Rosenberg <
> > > > jbr@squareup.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > > Hi,
> > > > > > > > > >
> > > > > > > > > > I'm exploring using kafka for the first time.
> > > > > > > > > >
> > > > > > > > > > I'm contemplating a system where we transmit metric data
> at
> > > > > regular
> > > > > > > > > > intervals to kafka.  One question I have is whether to
> > > generate
> > > > > > > simple
> > > > > > > > > > messages with very little meta data (just timestamp and
> > > value),
> > > > > and
> > > > > > > > > keeping
> > > > > > > > > > meta data like the name/host/app that generated metric
> out
> > of
> > > > the
> > > > > > > > > message,
> > > > > > > > > > and have that be embodied in the name of the topic itself
> > > > > instead.
> > > > > > > > > >  Alternatively, we could have a relatively small number
> of
> > > > > topics,
> > > > > > > > which
> > > > > > > > > > contain messages which include source meta data along
> with
> > > the
> > > > > > > > timestamp
> > > > > > > > > > and metric value in each message.
> > > > > > > > > >
> > > > > > > > > > 1. On one hand, we'd have a large number of topics (say
> > > several
> > > > > > > hundred
> > > > > > > > > > thousand topics) with small messages, generated at a
> steady
> > > > rate
> > > > > > (say
> > > > > > > > one
> > > > > > > > > > every 10 seconds).
> > > > > > > > > >
> > > > > > > > > > 2. Alternatively, we could have just few topics, which
> > > receive
> > > > > > > several
> > > > > > > > > > hundred thousand messages every 10 seconds, which
> contain 2
> > > or
> > > > 3
> > > > > > > times
> > > > > > > > > more
> > > > > > > > > > data per message.
> > > > > > > > > >
> > > > > > > > > > I'm wondering if kafka has any performance
> characteristics
> > > that
> > > > > > > differ
> > > > > > > > > for
> > > > > > > > > > the 2 scenarios.
> > > > > > > > > >
> > > > > > > > > > I like #1 because it simplifies targeted message
> > consumption,
> > > > and
> > > > > > > > enables
> > > > > > > > > > more interesting use of TopicFilter'ing.  But I'm unsure
> > > > whether
> > > > > > > there
> > > > > > > > > > might be performance concerns with kafka (does it have to
> > do
> > > > more
> > > > > > > work
> > > > > > > > to
> > > > > > > > > > separately manage each topic?).  Is this a common use
> case,
> > > or
> > > > > not?
> > > > > > > > > >
> > > > > > > > > > Thanks for any insight.
> > > > > > > > > >
> > > > > > > > > > Jason
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: implications of using large number of topics....

Posted by Jason Rosenberg <jb...@squareup.com>.
Cool,

What's the schedule for 0.8 coming out?  Are there any pre-release versions?

Jason

On Sat, Oct 13, 2012 at 8:37 PM, Jun Rao <ju...@gmail.com> wrote:

> Jason,
>
> The issue with 0.7 is that a topic exists on every broker and every time
> one adds a new broker, some additional partitions for each existing topic
> are added to the new broker. This is going to change in 0.8. A topic has a
> fixed number of partitions, independent of the # of brokers. So, by adding
> more brokers, we can support more topics in a cluster.
>
> Thanks,
>
> Jun
>
> On Fri, Oct 12, 2012 at 10:55 AM, Jason Rosenberg <jb...@squareup.com>
> wrote:
>
> > Has there ever been a thought to better handle a large number of topics?
> >  Prior discussions?  Or would it likely be too great of a change to the
> way
> > kafka works, no matter what?
> >
> > I'm wondering if there's a way to have a notion of multiple "virtual"
> > topics which are internally managed as members of a single topic "group",
> > but which at the api level, appear to be unique topics, from the client
> > perspective.
> >
> > Naturally, it would be straightforward to implement something like this
> by
> > wrapping the current client apis, but I'm wondering if there's any
> benefit
> > to building it into the internals.  This would still have the downside
> that
> > a client subscribing to a virtual topic would have to, under the covers,
> > sift through lots of messages it's not interested in.
> >
> > Any other interesting approaches?
> >
> > Jason
> >
> >
> > On Thu, Oct 11, 2012 at 10:48 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Mathias,
> > >
> > > What matters is the total # partitions since each corresponds to a
> > separate
> > > directory on disk. It doesn't matter how may topics those partitions
> are
> > > from.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Oct 11, 2012 at 6:43 AM, Mathias Söderberg <
> > > mathias.soederberg@gmail.com> wrote:
> > >
> > > > Hey all,
> > > >
> > > > This is a quite interesting topic (no pun intended), and I've seen it
> > > come
> > > > up at least once before.
> > > >
> > > > Me and a friend started experimenting with Kafka and ZooKeeper a
> little
> > > > while ago (building a publisher / subscriber system with consistent
> > > hashing
> > > > and whatnot) and currently we're using around 300 topics, all with
> one
> > > > partition each. So far we haven't really done any serious performance
> > > > testing, but I'm planning to do so in the following weeks. But I've
> > got a
> > > > few questions regardless:
> > > >
> > > >
> > > > Does / should it make any difference in performance when one has a
> lot
> > of
> > > > topics compared to having one topic with a lot of partitions? I'm
> > > imagining
> > > > that the system still needs to keep the same number of file
> descriptors
> > > > open, but I'm not sure how this would affect reads and writes? Are we
> > > going
> > > > to run into more random reads and writes by using a lot of topics
> > > compared
> > > > to using one topic with a lot of partitions instead? Can't really
> wrap
> > my
> > > > head around this right now, mostly because of my rather limited
> > knowledge
> > > > about how disks and page caches work.
> > > >
> > > > Could also add that we're mostly doing sequential reads (in rare
> cases
> > we
> > > > have to rewind a topic) and that the number of topics doesn't change.
> > > >
> > > > On 11 October 2012 05:13, Taylor Gautier <tg...@gmail.com> wrote:
> > > >
> > > > > We used pattern #1 at Tagged.  I wouldn't recommend it unless
> you're
> > > > really
> > > > > committed.  It took a lot of work to get it working right.
> > > > >
> > > > > a) Performance degraded non-linearly (read it fell off a cliff)
> when
> > > > > brokers were managing more than about 20k topics.  This was on a
> > Linux
> > > > RHEL
> > > > > 5.3 system with EXT3.  YMMV.
> > > > >
> > > > > b) Startup time is significantly longer for a broker that is
> > restarted
> > > > due
> > > > > to communication with ZK to sync up on those topics.
> > > > >
> > > > > c) If topics are short lived, even if Kafka expires the data
> segments
> > > > using
> > > > > it's standard 0.7 cleaner, the directory name for the topic will
> > still
> > > > > exist on disk and the topic is still considered "active" (in
> memory)
> > in
> > > > > Kafka.  This causes problems - see a above (open file handles).
> > > > >
> > > > > d) Message latency is affected.  Kafka syncs messages to disk if x
> > > > messages
> > > > > have buffered in memory, or y seconds have elapsed (both
> > configurable).
> > > >  If
> > > > > you have few topics and many messages (pattern #2), you will be
> > hitting
> > > > the
> > > > > x limit quite often, and get good throughput.  However, if you have
> > > many
> > > > > topics and few messages per topic (pattern #1), you will have to
> rely
> > > on
> > > > > the y threshold to flush to disk, and setting this too low can
> impact
> > > > > performance (throughput) in a significant way.  Jay already
> mentioned
> > > > this
> > > > > as random writes.
> > > > >
> > > > > We had to implement a number of solutions ourselves to resolve
> these
> > > > > issues, namely:
> > > > >
> > > > > #1 No ZK.  This means that all of the automatic partitioning done
> by
> > > > Kafka
> > > > > is not available, so we had to roll our own (luckily Tagged is
> pretty
> > > > used
> > > > > to scaling systems so there was much in-house expertise).  The
> > solution
> > > > > here was to implement a R/W proxy layer of machines to intercept
> > > messages
> > > > > and read/write to/from Kafka handling the sharding at the proxy
> > layer.
> > > > >  Because most of our messages were coming from PHP and we didn't
> want
> > > to
> > > > > use TCP we needed a UDP/TCP bridge/proxy anyway so this wasn't a
> huge
> > > > deal
> > > > > (also, we wanted strict ordering of messages, so we needed a shard
> by
> > > > topic
> > > > > feature anyway (I believe this can be done in 0.7 but we started
> with
> > > > 0.6)
> > > > >
> > > > > #2 Custom cleaner.  We implemented an extra cleanup task inside the
> > > Kafka
> > > > > process that could completely remove a topic from memory and disk.
> >  For
> > > > > clients, this sometimes meant that a subscribed topic suddenly
> > changed
> > > > it's
> > > > > physical offset from some offset X to 0, but that's ok, while
> > > technically
> > > > > it probably would never happen theoretically clients should have to
> > > > handle
> > > > > this case anyway because the Kafka physical message space is
> limited
> > to
> > > > > 64-bits (again, unlikely to ever wrap in practice, but you never
> > know).
> > > > >  Anyway it's pretty easy to handle this just catch the "invalid
> > offset"
> > > > > error Kafka gives and start at 0.
> > > > >
> > > > > #3 Low threshold for flush.  This gave us good latency, but poor
> > > > throughput
> > > > > (relatively speaking).  We had more than enough throughput, but it
> > was
> > > > > nowhere near what Kafka can do when setup in pattern #1.
> > > > >
> > > > > Given that you want to manage "hundreds of thousands of topics"
> that
> > > may
> > > > > mean that you would need 10's of Kafka brokers which could be
> another
> > > > > source of problems - it's more cost, more management, and more
> > sources
> > > of
> > > > > failure.  SSD's may help solve this problem btw, but now you are
> > > talking
> > > > > expensive machines rather than using just off the shelf cheapo
> > servers
> > > > with
> > > > > standard SATA drives.
> > > > >
> > > > > On Wed, Oct 10, 2012 at 4:25 PM, Jay Kreps <ja...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Yes the footprint of a topic is one directory per partition (a
> > topic
> > > > can
> > > > > > have many subpartitions per partitions). Each directory contains
> > one
> > > or
> > > > > > more files (depending on how much data you are retaining and the
> > > > segment
> > > > > > size, both configurable).
> > > > > >
> > > > > > In addition to having lots of open files, which certainly scales
> up
> > > to
> > > > > the
> > > > > > hundreds of thousands, this will also impact the I/O pattern. As
> > the
> > > > > number
> > > > > > of files increases the data written to each file necessarily
> > > decreases.
> > > > > > This likely means lots of random I/O. The OS can group together
> > > writes,
> > > > > but
> > > > > > if you only doing a single write per topic every now and then
> there
> > > > will
> > > > > be
> > > > > > nothing to group and you will lots of small random I/O. This will
> > > > > > definitely impact throughput. I don't know where the practical
> > limits
> > > > are
> > > > > > we have tested up to ~500 topics and see reasonable performance.
> We
> > > > have
> > > > > > not done serious performance testing with tens of thousands of
> > topics
> > > > or
> > > > > > more.
> > > > > >
> > > > > > In addition to the filesystem concerns there is metadata kept for
> > > each
> > > > > > partition in zk, and I believe zk keeps this metadata in memory.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Wed, Oct 10, 2012 at 4:12 PM, Jason Rosenberg <
> jbr@squareup.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Ok,
> > > > > > >
> > > > > > > Perhaps for the sake of argument, consider the question if we
> > have
> > > > > just 1
> > > > > > > kafka broker.  It sounds like it will need to keep a file
> handle
> > > open
> > > > > for
> > > > > > > each topic?  Is that right?
> > > > > > >
> > > > > > > Jason
> > > > > > >
> > > > > > > On Wed, Oct 10, 2012 at 4:05 PM, Neha Narkhede <
> > > > > neha.narkhede@gmail.com
> > > > > > > >wrote:
> > > > > > >
> > > > > > > > Hi Jason,
> > > > > > > >
> > > > > > > > We use option #2 at LinkedIn for metrics and tracking data.
> > > > > Supporting
> > > > > > > > Option #1 in Kafka 0.7 has its challenges since every topic
> is
> > > > stored
> > > > > > > > on every broker, by design. Hence, the number of topics a
> > cluster
> > > > can
> > > > > > > > support is limited by the IO and number of open file handles
> on
> > > > each
> > > > > > > > broker. After Kafka 0.8 is released, the distribution of
> topics
> > > to
> > > > > > > > brokers is user defined and can scale out with the number of
> > > > brokers.
> > > > > > > > Having said that, some Kafka users have successfully deployed
> > > Kafka
> > > > > > > > 0.7 clusters hosting very high number of topics. I hope they
> > can
> > > > > share
> > > > > > > > their experiences here.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Neha
> > > > > > > >
> > > > > > > > On Wed, Oct 10, 2012 at 3:57 PM, Jason Rosenberg <
> > > jbr@squareup.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > I'm exploring using kafka for the first time.
> > > > > > > > >
> > > > > > > > > I'm contemplating a system where we transmit metric data at
> > > > regular
> > > > > > > > > intervals to kafka.  One question I have is whether to
> > generate
> > > > > > simple
> > > > > > > > > messages with very little meta data (just timestamp and
> > value),
> > > > and
> > > > > > > > keeping
> > > > > > > > > meta data like the name/host/app that generated metric out
> of
> > > the
> > > > > > > > message,
> > > > > > > > > and have that be embodied in the name of the topic itself
> > > > instead.
> > > > > > > > >  Alternatively, we could have a relatively small number of
> > > > topics,
> > > > > > > which
> > > > > > > > > contain messages which include source meta data along with
> > the
> > > > > > > timestamp
> > > > > > > > > and metric value in each message.
> > > > > > > > >
> > > > > > > > > 1. On one hand, we'd have a large number of topics (say
> > several
> > > > > > hundred
> > > > > > > > > thousand topics) with small messages, generated at a steady
> > > rate
> > > > > (say
> > > > > > > one
> > > > > > > > > every 10 seconds).
> > > > > > > > >
> > > > > > > > > 2. Alternatively, we could have just few topics, which
> > receive
> > > > > > several
> > > > > > > > > hundred thousand messages every 10 seconds, which contain 2
> > or
> > > 3
> > > > > > times
> > > > > > > > more
> > > > > > > > > data per message.
> > > > > > > > >
> > > > > > > > > I'm wondering if kafka has any performance characteristics
> > that
> > > > > > differ
> > > > > > > > for
> > > > > > > > > the 2 scenarios.
> > > > > > > > >
> > > > > > > > > I like #1 because it simplifies targeted message
> consumption,
> > > and
> > > > > > > enables
> > > > > > > > > more interesting use of TopicFilter'ing.  But I'm unsure
> > > whether
> > > > > > there
> > > > > > > > > might be performance concerns with kafka (does it have to
> do
> > > more
> > > > > > work
> > > > > > > to
> > > > > > > > > separately manage each topic?).  Is this a common use case,
> > or
> > > > not?
> > > > > > > > >
> > > > > > > > > Thanks for any insight.
> > > > > > > > >
> > > > > > > > > Jason
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: implications of using large number of topics....

Posted by Jun Rao <ju...@gmail.com>.
Jason,

The issue with 0.7 is that a topic exists on every broker and every time
one adds a new broker, some additional partitions for each existing topic
are added to the new broker. This is going to change in 0.8. A topic has a
fixed number of partitions, independent of the # of brokers. So, by adding
more brokers, we can support more topics in a cluster.

Thanks,

Jun

On Fri, Oct 12, 2012 at 10:55 AM, Jason Rosenberg <jb...@squareup.com> wrote:

> Has there ever been a thought to better handle a large number of topics?
>  Prior discussions?  Or would it likely be too great of a change to the way
> kafka works, no matter what?
>
> I'm wondering if there's a way to have a notion of multiple "virtual"
> topics which are internally managed as members of a single topic "group",
> but which at the api level, appear to be unique topics, from the client
> perspective.
>
> Naturally, it would be straightforward to implement something like this by
> wrapping the current client apis, but I'm wondering if there's any benefit
> to building it into the internals.  This would still have the downside that
> a client subscribing to a virtual topic would have to, under the covers,
> sift through lots of messages it's not interested in.
>
> Any other interesting approaches?
>
> Jason
>
>
> On Thu, Oct 11, 2012 at 10:48 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Mathias,
> >
> > What matters is the total # partitions since each corresponds to a
> separate
> > directory on disk. It doesn't matter how may topics those partitions are
> > from.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Oct 11, 2012 at 6:43 AM, Mathias Söderberg <
> > mathias.soederberg@gmail.com> wrote:
> >
> > > Hey all,
> > >
> > > This is a quite interesting topic (no pun intended), and I've seen it
> > come
> > > up at least once before.
> > >
> > > Me and a friend started experimenting with Kafka and ZooKeeper a little
> > > while ago (building a publisher / subscriber system with consistent
> > hashing
> > > and whatnot) and currently we're using around 300 topics, all with one
> > > partition each. So far we haven't really done any serious performance
> > > testing, but I'm planning to do so in the following weeks. But I've
> got a
> > > few questions regardless:
> > >
> > >
> > > Does / should it make any difference in performance when one has a lot
> of
> > > topics compared to having one topic with a lot of partitions? I'm
> > imagining
> > > that the system still needs to keep the same number of file descriptors
> > > open, but I'm not sure how this would affect reads and writes? Are we
> > going
> > > to run into more random reads and writes by using a lot of topics
> > compared
> > > to using one topic with a lot of partitions instead? Can't really wrap
> my
> > > head around this right now, mostly because of my rather limited
> knowledge
> > > about how disks and page caches work.
> > >
> > > Could also add that we're mostly doing sequential reads (in rare cases
> we
> > > have to rewind a topic) and that the number of topics doesn't change.
> > >
> > > On 11 October 2012 05:13, Taylor Gautier <tg...@gmail.com> wrote:
> > >
> > > > We used pattern #1 at Tagged.  I wouldn't recommend it unless you're
> > > really
> > > > committed.  It took a lot of work to get it working right.
> > > >
> > > > a) Performance degraded non-linearly (read it fell off a cliff) when
> > > > brokers were managing more than about 20k topics.  This was on a
> Linux
> > > RHEL
> > > > 5.3 system with EXT3.  YMMV.
> > > >
> > > > b) Startup time is significantly longer for a broker that is
> restarted
> > > due
> > > > to communication with ZK to sync up on those topics.
> > > >
> > > > c) If topics are short lived, even if Kafka expires the data segments
> > > using
> > > > it's standard 0.7 cleaner, the directory name for the topic will
> still
> > > > exist on disk and the topic is still considered "active" (in memory)
> in
> > > > Kafka.  This causes problems - see a above (open file handles).
> > > >
> > > > d) Message latency is affected.  Kafka syncs messages to disk if x
> > > messages
> > > > have buffered in memory, or y seconds have elapsed (both
> configurable).
> > >  If
> > > > you have few topics and many messages (pattern #2), you will be
> hitting
> > > the
> > > > x limit quite often, and get good throughput.  However, if you have
> > many
> > > > topics and few messages per topic (pattern #1), you will have to rely
> > on
> > > > the y threshold to flush to disk, and setting this too low can impact
> > > > performance (throughput) in a significant way.  Jay already mentioned
> > > this
> > > > as random writes.
> > > >
> > > > We had to implement a number of solutions ourselves to resolve these
> > > > issues, namely:
> > > >
> > > > #1 No ZK.  This means that all of the automatic partitioning done by
> > > Kafka
> > > > is not available, so we had to roll our own (luckily Tagged is pretty
> > > used
> > > > to scaling systems so there was much in-house expertise).  The
> solution
> > > > here was to implement a R/W proxy layer of machines to intercept
> > messages
> > > > and read/write to/from Kafka handling the sharding at the proxy
> layer.
> > > >  Because most of our messages were coming from PHP and we didn't want
> > to
> > > > use TCP we needed a UDP/TCP bridge/proxy anyway so this wasn't a huge
> > > deal
> > > > (also, we wanted strict ordering of messages, so we needed a shard by
> > > topic
> > > > feature anyway (I believe this can be done in 0.7 but we started with
> > > 0.6)
> > > >
> > > > #2 Custom cleaner.  We implemented an extra cleanup task inside the
> > Kafka
> > > > process that could completely remove a topic from memory and disk.
>  For
> > > > clients, this sometimes meant that a subscribed topic suddenly
> changed
> > > it's
> > > > physical offset from some offset X to 0, but that's ok, while
> > technically
> > > > it probably would never happen theoretically clients should have to
> > > handle
> > > > this case anyway because the Kafka physical message space is limited
> to
> > > > 64-bits (again, unlikely to ever wrap in practice, but you never
> know).
> > > >  Anyway it's pretty easy to handle this just catch the "invalid
> offset"
> > > > error Kafka gives and start at 0.
> > > >
> > > > #3 Low threshold for flush.  This gave us good latency, but poor
> > > throughput
> > > > (relatively speaking).  We had more than enough throughput, but it
> was
> > > > nowhere near what Kafka can do when setup in pattern #1.
> > > >
> > > > Given that you want to manage "hundreds of thousands of topics" that
> > may
> > > > mean that you would need 10's of Kafka brokers which could be another
> > > > source of problems - it's more cost, more management, and more
> sources
> > of
> > > > failure.  SSD's may help solve this problem btw, but now you are
> > talking
> > > > expensive machines rather than using just off the shelf cheapo
> servers
> > > with
> > > > standard SATA drives.
> > > >
> > > > On Wed, Oct 10, 2012 at 4:25 PM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > > >
> > > > > Yes the footprint of a topic is one directory per partition (a
> topic
> > > can
> > > > > have many subpartitions per partitions). Each directory contains
> one
> > or
> > > > > more files (depending on how much data you are retaining and the
> > > segment
> > > > > size, both configurable).
> > > > >
> > > > > In addition to having lots of open files, which certainly scales up
> > to
> > > > the
> > > > > hundreds of thousands, this will also impact the I/O pattern. As
> the
> > > > number
> > > > > of files increases the data written to each file necessarily
> > decreases.
> > > > > This likely means lots of random I/O. The OS can group together
> > writes,
> > > > but
> > > > > if you only doing a single write per topic every now and then there
> > > will
> > > > be
> > > > > nothing to group and you will lots of small random I/O. This will
> > > > > definitely impact throughput. I don't know where the practical
> limits
> > > are
> > > > > we have tested up to ~500 topics and see reasonable performance. We
> > > have
> > > > > not done serious performance testing with tens of thousands of
> topics
> > > or
> > > > > more.
> > > > >
> > > > > In addition to the filesystem concerns there is metadata kept for
> > each
> > > > > partition in zk, and I believe zk keeps this metadata in memory.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Wed, Oct 10, 2012 at 4:12 PM, Jason Rosenberg <jbr@squareup.com
> >
> > > > wrote:
> > > > >
> > > > > > Ok,
> > > > > >
> > > > > > Perhaps for the sake of argument, consider the question if we
> have
> > > > just 1
> > > > > > kafka broker.  It sounds like it will need to keep a file handle
> > open
> > > > for
> > > > > > each topic?  Is that right?
> > > > > >
> > > > > > Jason
> > > > > >
> > > > > > On Wed, Oct 10, 2012 at 4:05 PM, Neha Narkhede <
> > > > neha.narkhede@gmail.com
> > > > > > >wrote:
> > > > > >
> > > > > > > Hi Jason,
> > > > > > >
> > > > > > > We use option #2 at LinkedIn for metrics and tracking data.
> > > > Supporting
> > > > > > > Option #1 in Kafka 0.7 has its challenges since every topic is
> > > stored
> > > > > > > on every broker, by design. Hence, the number of topics a
> cluster
> > > can
> > > > > > > support is limited by the IO and number of open file handles on
> > > each
> > > > > > > broker. After Kafka 0.8 is released, the distribution of topics
> > to
> > > > > > > brokers is user defined and can scale out with the number of
> > > brokers.
> > > > > > > Having said that, some Kafka users have successfully deployed
> > Kafka
> > > > > > > 0.7 clusters hosting very high number of topics. I hope they
> can
> > > > share
> > > > > > > their experiences here.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Neha
> > > > > > >
> > > > > > > On Wed, Oct 10, 2012 at 3:57 PM, Jason Rosenberg <
> > jbr@squareup.com
> > > >
> > > > > > wrote:
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > I'm exploring using kafka for the first time.
> > > > > > > >
> > > > > > > > I'm contemplating a system where we transmit metric data at
> > > regular
> > > > > > > > intervals to kafka.  One question I have is whether to
> generate
> > > > > simple
> > > > > > > > messages with very little meta data (just timestamp and
> value),
> > > and
> > > > > > > keeping
> > > > > > > > meta data like the name/host/app that generated metric out of
> > the
> > > > > > > message,
> > > > > > > > and have that be embodied in the name of the topic itself
> > > instead.
> > > > > > > >  Alternatively, we could have a relatively small number of
> > > topics,
> > > > > > which
> > > > > > > > contain messages which include source meta data along with
> the
> > > > > > timestamp
> > > > > > > > and metric value in each message.
> > > > > > > >
> > > > > > > > 1. On one hand, we'd have a large number of topics (say
> several
> > > > > hundred
> > > > > > > > thousand topics) with small messages, generated at a steady
> > rate
> > > > (say
> > > > > > one
> > > > > > > > every 10 seconds).
> > > > > > > >
> > > > > > > > 2. Alternatively, we could have just few topics, which
> receive
> > > > > several
> > > > > > > > hundred thousand messages every 10 seconds, which contain 2
> or
> > 3
> > > > > times
> > > > > > > more
> > > > > > > > data per message.
> > > > > > > >
> > > > > > > > I'm wondering if kafka has any performance characteristics
> that
> > > > > differ
> > > > > > > for
> > > > > > > > the 2 scenarios.
> > > > > > > >
> > > > > > > > I like #1 because it simplifies targeted message consumption,
> > and
> > > > > > enables
> > > > > > > > more interesting use of TopicFilter'ing.  But I'm unsure
> > whether
> > > > > there
> > > > > > > > might be performance concerns with kafka (does it have to do
> > more
> > > > > work
> > > > > > to
> > > > > > > > separately manage each topic?).  Is this a common use case,
> or
> > > not?
> > > > > > > >
> > > > > > > > Thanks for any insight.
> > > > > > > >
> > > > > > > > Jason
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: implications of using large number of topics....

Posted by Jason Rosenberg <jb...@squareup.com>.
Has there ever been a thought to better handle a large number of topics?
 Prior discussions?  Or would it likely be too great of a change to the way
kafka works, no matter what?

I'm wondering if there's a way to have a notion of multiple "virtual"
topics which are internally managed as members of a single topic "group",
but which at the api level, appear to be unique topics, from the client
perspective.

Naturally, it would be straightforward to implement something like this by
wrapping the current client apis, but I'm wondering if there's any benefit
to building it into the internals.  This would still have the downside that
a client subscribing to a virtual topic would have to, under the covers,
sift through lots of messages it's not interested in.

Any other interesting approaches?

Jason


On Thu, Oct 11, 2012 at 10:48 PM, Jun Rao <ju...@gmail.com> wrote:

> Mathias,
>
> What matters is the total # partitions since each corresponds to a separate
> directory on disk. It doesn't matter how may topics those partitions are
> from.
>
> Thanks,
>
> Jun
>
> On Thu, Oct 11, 2012 at 6:43 AM, Mathias Söderberg <
> mathias.soederberg@gmail.com> wrote:
>
> > Hey all,
> >
> > This is a quite interesting topic (no pun intended), and I've seen it
> come
> > up at least once before.
> >
> > Me and a friend started experimenting with Kafka and ZooKeeper a little
> > while ago (building a publisher / subscriber system with consistent
> hashing
> > and whatnot) and currently we're using around 300 topics, all with one
> > partition each. So far we haven't really done any serious performance
> > testing, but I'm planning to do so in the following weeks. But I've got a
> > few questions regardless:
> >
> >
> > Does / should it make any difference in performance when one has a lot of
> > topics compared to having one topic with a lot of partitions? I'm
> imagining
> > that the system still needs to keep the same number of file descriptors
> > open, but I'm not sure how this would affect reads and writes? Are we
> going
> > to run into more random reads and writes by using a lot of topics
> compared
> > to using one topic with a lot of partitions instead? Can't really wrap my
> > head around this right now, mostly because of my rather limited knowledge
> > about how disks and page caches work.
> >
> > Could also add that we're mostly doing sequential reads (in rare cases we
> > have to rewind a topic) and that the number of topics doesn't change.
> >
> > On 11 October 2012 05:13, Taylor Gautier <tg...@gmail.com> wrote:
> >
> > > We used pattern #1 at Tagged.  I wouldn't recommend it unless you're
> > really
> > > committed.  It took a lot of work to get it working right.
> > >
> > > a) Performance degraded non-linearly (read it fell off a cliff) when
> > > brokers were managing more than about 20k topics.  This was on a Linux
> > RHEL
> > > 5.3 system with EXT3.  YMMV.
> > >
> > > b) Startup time is significantly longer for a broker that is restarted
> > due
> > > to communication with ZK to sync up on those topics.
> > >
> > > c) If topics are short lived, even if Kafka expires the data segments
> > using
> > > it's standard 0.7 cleaner, the directory name for the topic will still
> > > exist on disk and the topic is still considered "active" (in memory) in
> > > Kafka.  This causes problems - see a above (open file handles).
> > >
> > > d) Message latency is affected.  Kafka syncs messages to disk if x
> > messages
> > > have buffered in memory, or y seconds have elapsed (both configurable).
> >  If
> > > you have few topics and many messages (pattern #2), you will be hitting
> > the
> > > x limit quite often, and get good throughput.  However, if you have
> many
> > > topics and few messages per topic (pattern #1), you will have to rely
> on
> > > the y threshold to flush to disk, and setting this too low can impact
> > > performance (throughput) in a significant way.  Jay already mentioned
> > this
> > > as random writes.
> > >
> > > We had to implement a number of solutions ourselves to resolve these
> > > issues, namely:
> > >
> > > #1 No ZK.  This means that all of the automatic partitioning done by
> > Kafka
> > > is not available, so we had to roll our own (luckily Tagged is pretty
> > used
> > > to scaling systems so there was much in-house expertise).  The solution
> > > here was to implement a R/W proxy layer of machines to intercept
> messages
> > > and read/write to/from Kafka handling the sharding at the proxy layer.
> > >  Because most of our messages were coming from PHP and we didn't want
> to
> > > use TCP we needed a UDP/TCP bridge/proxy anyway so this wasn't a huge
> > deal
> > > (also, we wanted strict ordering of messages, so we needed a shard by
> > topic
> > > feature anyway (I believe this can be done in 0.7 but we started with
> > 0.6)
> > >
> > > #2 Custom cleaner.  We implemented an extra cleanup task inside the
> Kafka
> > > process that could completely remove a topic from memory and disk.  For
> > > clients, this sometimes meant that a subscribed topic suddenly changed
> > it's
> > > physical offset from some offset X to 0, but that's ok, while
> technically
> > > it probably would never happen theoretically clients should have to
> > handle
> > > this case anyway because the Kafka physical message space is limited to
> > > 64-bits (again, unlikely to ever wrap in practice, but you never know).
> > >  Anyway it's pretty easy to handle this just catch the "invalid offset"
> > > error Kafka gives and start at 0.
> > >
> > > #3 Low threshold for flush.  This gave us good latency, but poor
> > throughput
> > > (relatively speaking).  We had more than enough throughput, but it was
> > > nowhere near what Kafka can do when setup in pattern #1.
> > >
> > > Given that you want to manage "hundreds of thousands of topics" that
> may
> > > mean that you would need 10's of Kafka brokers which could be another
> > > source of problems - it's more cost, more management, and more sources
> of
> > > failure.  SSD's may help solve this problem btw, but now you are
> talking
> > > expensive machines rather than using just off the shelf cheapo servers
> > with
> > > standard SATA drives.
> > >
> > > On Wed, Oct 10, 2012 at 4:25 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >
> > > > Yes the footprint of a topic is one directory per partition (a topic
> > can
> > > > have many subpartitions per partitions). Each directory contains one
> or
> > > > more files (depending on how much data you are retaining and the
> > segment
> > > > size, both configurable).
> > > >
> > > > In addition to having lots of open files, which certainly scales up
> to
> > > the
> > > > hundreds of thousands, this will also impact the I/O pattern. As the
> > > number
> > > > of files increases the data written to each file necessarily
> decreases.
> > > > This likely means lots of random I/O. The OS can group together
> writes,
> > > but
> > > > if you only doing a single write per topic every now and then there
> > will
> > > be
> > > > nothing to group and you will lots of small random I/O. This will
> > > > definitely impact throughput. I don't know where the practical limits
> > are
> > > > we have tested up to ~500 topics and see reasonable performance. We
> > have
> > > > not done serious performance testing with tens of thousands of topics
> > or
> > > > more.
> > > >
> > > > In addition to the filesystem concerns there is metadata kept for
> each
> > > > partition in zk, and I believe zk keeps this metadata in memory.
> > > >
> > > > -Jay
> > > >
> > > > On Wed, Oct 10, 2012 at 4:12 PM, Jason Rosenberg <jb...@squareup.com>
> > > wrote:
> > > >
> > > > > Ok,
> > > > >
> > > > > Perhaps for the sake of argument, consider the question if we have
> > > just 1
> > > > > kafka broker.  It sounds like it will need to keep a file handle
> open
> > > for
> > > > > each topic?  Is that right?
> > > > >
> > > > > Jason
> > > > >
> > > > > On Wed, Oct 10, 2012 at 4:05 PM, Neha Narkhede <
> > > neha.narkhede@gmail.com
> > > > > >wrote:
> > > > >
> > > > > > Hi Jason,
> > > > > >
> > > > > > We use option #2 at LinkedIn for metrics and tracking data.
> > > Supporting
> > > > > > Option #1 in Kafka 0.7 has its challenges since every topic is
> > stored
> > > > > > on every broker, by design. Hence, the number of topics a cluster
> > can
> > > > > > support is limited by the IO and number of open file handles on
> > each
> > > > > > broker. After Kafka 0.8 is released, the distribution of topics
> to
> > > > > > brokers is user defined and can scale out with the number of
> > brokers.
> > > > > > Having said that, some Kafka users have successfully deployed
> Kafka
> > > > > > 0.7 clusters hosting very high number of topics. I hope they can
> > > share
> > > > > > their experiences here.
> > > > > >
> > > > > > Thanks,
> > > > > > Neha
> > > > > >
> > > > > > On Wed, Oct 10, 2012 at 3:57 PM, Jason Rosenberg <
> jbr@squareup.com
> > >
> > > > > wrote:
> > > > > > > Hi,
> > > > > > >
> > > > > > > I'm exploring using kafka for the first time.
> > > > > > >
> > > > > > > I'm contemplating a system where we transmit metric data at
> > regular
> > > > > > > intervals to kafka.  One question I have is whether to generate
> > > > simple
> > > > > > > messages with very little meta data (just timestamp and value),
> > and
> > > > > > keeping
> > > > > > > meta data like the name/host/app that generated metric out of
> the
> > > > > > message,
> > > > > > > and have that be embodied in the name of the topic itself
> > instead.
> > > > > > >  Alternatively, we could have a relatively small number of
> > topics,
> > > > > which
> > > > > > > contain messages which include source meta data along with the
> > > > > timestamp
> > > > > > > and metric value in each message.
> > > > > > >
> > > > > > > 1. On one hand, we'd have a large number of topics (say several
> > > > hundred
> > > > > > > thousand topics) with small messages, generated at a steady
> rate
> > > (say
> > > > > one
> > > > > > > every 10 seconds).
> > > > > > >
> > > > > > > 2. Alternatively, we could have just few topics, which receive
> > > > several
> > > > > > > hundred thousand messages every 10 seconds, which contain 2 or
> 3
> > > > times
> > > > > > more
> > > > > > > data per message.
> > > > > > >
> > > > > > > I'm wondering if kafka has any performance characteristics that
> > > > differ
> > > > > > for
> > > > > > > the 2 scenarios.
> > > > > > >
> > > > > > > I like #1 because it simplifies targeted message consumption,
> and
> > > > > enables
> > > > > > > more interesting use of TopicFilter'ing.  But I'm unsure
> whether
> > > > there
> > > > > > > might be performance concerns with kafka (does it have to do
> more
> > > > work
> > > > > to
> > > > > > > separately manage each topic?).  Is this a common use case, or
> > not?
> > > > > > >
> > > > > > > Thanks for any insight.
> > > > > > >
> > > > > > > Jason
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: implications of using large number of topics....

Posted by Jun Rao <ju...@gmail.com>.
Mathias,

What matters is the total # partitions since each corresponds to a separate
directory on disk. It doesn't matter how may topics those partitions are
from.

Thanks,

Jun

On Thu, Oct 11, 2012 at 6:43 AM, Mathias Söderberg <
mathias.soederberg@gmail.com> wrote:

> Hey all,
>
> This is a quite interesting topic (no pun intended), and I've seen it come
> up at least once before.
>
> Me and a friend started experimenting with Kafka and ZooKeeper a little
> while ago (building a publisher / subscriber system with consistent hashing
> and whatnot) and currently we're using around 300 topics, all with one
> partition each. So far we haven't really done any serious performance
> testing, but I'm planning to do so in the following weeks. But I've got a
> few questions regardless:
>
>
> Does / should it make any difference in performance when one has a lot of
> topics compared to having one topic with a lot of partitions? I'm imagining
> that the system still needs to keep the same number of file descriptors
> open, but I'm not sure how this would affect reads and writes? Are we going
> to run into more random reads and writes by using a lot of topics compared
> to using one topic with a lot of partitions instead? Can't really wrap my
> head around this right now, mostly because of my rather limited knowledge
> about how disks and page caches work.
>
> Could also add that we're mostly doing sequential reads (in rare cases we
> have to rewind a topic) and that the number of topics doesn't change.
>
> On 11 October 2012 05:13, Taylor Gautier <tg...@gmail.com> wrote:
>
> > We used pattern #1 at Tagged.  I wouldn't recommend it unless you're
> really
> > committed.  It took a lot of work to get it working right.
> >
> > a) Performance degraded non-linearly (read it fell off a cliff) when
> > brokers were managing more than about 20k topics.  This was on a Linux
> RHEL
> > 5.3 system with EXT3.  YMMV.
> >
> > b) Startup time is significantly longer for a broker that is restarted
> due
> > to communication with ZK to sync up on those topics.
> >
> > c) If topics are short lived, even if Kafka expires the data segments
> using
> > it's standard 0.7 cleaner, the directory name for the topic will still
> > exist on disk and the topic is still considered "active" (in memory) in
> > Kafka.  This causes problems - see a above (open file handles).
> >
> > d) Message latency is affected.  Kafka syncs messages to disk if x
> messages
> > have buffered in memory, or y seconds have elapsed (both configurable).
>  If
> > you have few topics and many messages (pattern #2), you will be hitting
> the
> > x limit quite often, and get good throughput.  However, if you have many
> > topics and few messages per topic (pattern #1), you will have to rely on
> > the y threshold to flush to disk, and setting this too low can impact
> > performance (throughput) in a significant way.  Jay already mentioned
> this
> > as random writes.
> >
> > We had to implement a number of solutions ourselves to resolve these
> > issues, namely:
> >
> > #1 No ZK.  This means that all of the automatic partitioning done by
> Kafka
> > is not available, so we had to roll our own (luckily Tagged is pretty
> used
> > to scaling systems so there was much in-house expertise).  The solution
> > here was to implement a R/W proxy layer of machines to intercept messages
> > and read/write to/from Kafka handling the sharding at the proxy layer.
> >  Because most of our messages were coming from PHP and we didn't want to
> > use TCP we needed a UDP/TCP bridge/proxy anyway so this wasn't a huge
> deal
> > (also, we wanted strict ordering of messages, so we needed a shard by
> topic
> > feature anyway (I believe this can be done in 0.7 but we started with
> 0.6)
> >
> > #2 Custom cleaner.  We implemented an extra cleanup task inside the Kafka
> > process that could completely remove a topic from memory and disk.  For
> > clients, this sometimes meant that a subscribed topic suddenly changed
> it's
> > physical offset from some offset X to 0, but that's ok, while technically
> > it probably would never happen theoretically clients should have to
> handle
> > this case anyway because the Kafka physical message space is limited to
> > 64-bits (again, unlikely to ever wrap in practice, but you never know).
> >  Anyway it's pretty easy to handle this just catch the "invalid offset"
> > error Kafka gives and start at 0.
> >
> > #3 Low threshold for flush.  This gave us good latency, but poor
> throughput
> > (relatively speaking).  We had more than enough throughput, but it was
> > nowhere near what Kafka can do when setup in pattern #1.
> >
> > Given that you want to manage "hundreds of thousands of topics" that may
> > mean that you would need 10's of Kafka brokers which could be another
> > source of problems - it's more cost, more management, and more sources of
> > failure.  SSD's may help solve this problem btw, but now you are talking
> > expensive machines rather than using just off the shelf cheapo servers
> with
> > standard SATA drives.
> >
> > On Wed, Oct 10, 2012 at 4:25 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Yes the footprint of a topic is one directory per partition (a topic
> can
> > > have many subpartitions per partitions). Each directory contains one or
> > > more files (depending on how much data you are retaining and the
> segment
> > > size, both configurable).
> > >
> > > In addition to having lots of open files, which certainly scales up to
> > the
> > > hundreds of thousands, this will also impact the I/O pattern. As the
> > number
> > > of files increases the data written to each file necessarily decreases.
> > > This likely means lots of random I/O. The OS can group together writes,
> > but
> > > if you only doing a single write per topic every now and then there
> will
> > be
> > > nothing to group and you will lots of small random I/O. This will
> > > definitely impact throughput. I don't know where the practical limits
> are
> > > we have tested up to ~500 topics and see reasonable performance. We
> have
> > > not done serious performance testing with tens of thousands of topics
> or
> > > more.
> > >
> > > In addition to the filesystem concerns there is metadata kept for each
> > > partition in zk, and I believe zk keeps this metadata in memory.
> > >
> > > -Jay
> > >
> > > On Wed, Oct 10, 2012 at 4:12 PM, Jason Rosenberg <jb...@squareup.com>
> > wrote:
> > >
> > > > Ok,
> > > >
> > > > Perhaps for the sake of argument, consider the question if we have
> > just 1
> > > > kafka broker.  It sounds like it will need to keep a file handle open
> > for
> > > > each topic?  Is that right?
> > > >
> > > > Jason
> > > >
> > > > On Wed, Oct 10, 2012 at 4:05 PM, Neha Narkhede <
> > neha.narkhede@gmail.com
> > > > >wrote:
> > > >
> > > > > Hi Jason,
> > > > >
> > > > > We use option #2 at LinkedIn for metrics and tracking data.
> > Supporting
> > > > > Option #1 in Kafka 0.7 has its challenges since every topic is
> stored
> > > > > on every broker, by design. Hence, the number of topics a cluster
> can
> > > > > support is limited by the IO and number of open file handles on
> each
> > > > > broker. After Kafka 0.8 is released, the distribution of topics to
> > > > > brokers is user defined and can scale out with the number of
> brokers.
> > > > > Having said that, some Kafka users have successfully deployed Kafka
> > > > > 0.7 clusters hosting very high number of topics. I hope they can
> > share
> > > > > their experiences here.
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > >
> > > > > On Wed, Oct 10, 2012 at 3:57 PM, Jason Rosenberg <jbr@squareup.com
> >
> > > > wrote:
> > > > > > Hi,
> > > > > >
> > > > > > I'm exploring using kafka for the first time.
> > > > > >
> > > > > > I'm contemplating a system where we transmit metric data at
> regular
> > > > > > intervals to kafka.  One question I have is whether to generate
> > > simple
> > > > > > messages with very little meta data (just timestamp and value),
> and
> > > > > keeping
> > > > > > meta data like the name/host/app that generated metric out of the
> > > > > message,
> > > > > > and have that be embodied in the name of the topic itself
> instead.
> > > > > >  Alternatively, we could have a relatively small number of
> topics,
> > > > which
> > > > > > contain messages which include source meta data along with the
> > > > timestamp
> > > > > > and metric value in each message.
> > > > > >
> > > > > > 1. On one hand, we'd have a large number of topics (say several
> > > hundred
> > > > > > thousand topics) with small messages, generated at a steady rate
> > (say
> > > > one
> > > > > > every 10 seconds).
> > > > > >
> > > > > > 2. Alternatively, we could have just few topics, which receive
> > > several
> > > > > > hundred thousand messages every 10 seconds, which contain 2 or 3
> > > times
> > > > > more
> > > > > > data per message.
> > > > > >
> > > > > > I'm wondering if kafka has any performance characteristics that
> > > differ
> > > > > for
> > > > > > the 2 scenarios.
> > > > > >
> > > > > > I like #1 because it simplifies targeted message consumption, and
> > > > enables
> > > > > > more interesting use of TopicFilter'ing.  But I'm unsure whether
> > > there
> > > > > > might be performance concerns with kafka (does it have to do more
> > > work
> > > > to
> > > > > > separately manage each topic?).  Is this a common use case, or
> not?
> > > > > >
> > > > > > Thanks for any insight.
> > > > > >
> > > > > > Jason
> > > > >
> > > >
> > >
> >
>

Re: implications of using large number of topics....

Posted by Mathias Söderberg <ma...@gmail.com>.
Hey all,

This is a quite interesting topic (no pun intended), and I've seen it come
up at least once before.

Me and a friend started experimenting with Kafka and ZooKeeper a little
while ago (building a publisher / subscriber system with consistent hashing
and whatnot) and currently we're using around 300 topics, all with one
partition each. So far we haven't really done any serious performance
testing, but I'm planning to do so in the following weeks. But I've got a
few questions regardless:


Does / should it make any difference in performance when one has a lot of
topics compared to having one topic with a lot of partitions? I'm imagining
that the system still needs to keep the same number of file descriptors
open, but I'm not sure how this would affect reads and writes? Are we going
to run into more random reads and writes by using a lot of topics compared
to using one topic with a lot of partitions instead? Can't really wrap my
head around this right now, mostly because of my rather limited knowledge
about how disks and page caches work.

Could also add that we're mostly doing sequential reads (in rare cases we
have to rewind a topic) and that the number of topics doesn't change.

On 11 October 2012 05:13, Taylor Gautier <tg...@gmail.com> wrote:

> We used pattern #1 at Tagged.  I wouldn't recommend it unless you're really
> committed.  It took a lot of work to get it working right.
>
> a) Performance degraded non-linearly (read it fell off a cliff) when
> brokers were managing more than about 20k topics.  This was on a Linux RHEL
> 5.3 system with EXT3.  YMMV.
>
> b) Startup time is significantly longer for a broker that is restarted due
> to communication with ZK to sync up on those topics.
>
> c) If topics are short lived, even if Kafka expires the data segments using
> it's standard 0.7 cleaner, the directory name for the topic will still
> exist on disk and the topic is still considered "active" (in memory) in
> Kafka.  This causes problems - see a above (open file handles).
>
> d) Message latency is affected.  Kafka syncs messages to disk if x messages
> have buffered in memory, or y seconds have elapsed (both configurable).  If
> you have few topics and many messages (pattern #2), you will be hitting the
> x limit quite often, and get good throughput.  However, if you have many
> topics and few messages per topic (pattern #1), you will have to rely on
> the y threshold to flush to disk, and setting this too low can impact
> performance (throughput) in a significant way.  Jay already mentioned this
> as random writes.
>
> We had to implement a number of solutions ourselves to resolve these
> issues, namely:
>
> #1 No ZK.  This means that all of the automatic partitioning done by Kafka
> is not available, so we had to roll our own (luckily Tagged is pretty used
> to scaling systems so there was much in-house expertise).  The solution
> here was to implement a R/W proxy layer of machines to intercept messages
> and read/write to/from Kafka handling the sharding at the proxy layer.
>  Because most of our messages were coming from PHP and we didn't want to
> use TCP we needed a UDP/TCP bridge/proxy anyway so this wasn't a huge deal
> (also, we wanted strict ordering of messages, so we needed a shard by topic
> feature anyway (I believe this can be done in 0.7 but we started with 0.6)
>
> #2 Custom cleaner.  We implemented an extra cleanup task inside the Kafka
> process that could completely remove a topic from memory and disk.  For
> clients, this sometimes meant that a subscribed topic suddenly changed it's
> physical offset from some offset X to 0, but that's ok, while technically
> it probably would never happen theoretically clients should have to handle
> this case anyway because the Kafka physical message space is limited to
> 64-bits (again, unlikely to ever wrap in practice, but you never know).
>  Anyway it's pretty easy to handle this just catch the "invalid offset"
> error Kafka gives and start at 0.
>
> #3 Low threshold for flush.  This gave us good latency, but poor throughput
> (relatively speaking).  We had more than enough throughput, but it was
> nowhere near what Kafka can do when setup in pattern #1.
>
> Given that you want to manage "hundreds of thousands of topics" that may
> mean that you would need 10's of Kafka brokers which could be another
> source of problems - it's more cost, more management, and more sources of
> failure.  SSD's may help solve this problem btw, but now you are talking
> expensive machines rather than using just off the shelf cheapo servers with
> standard SATA drives.
>
> On Wed, Oct 10, 2012 at 4:25 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Yes the footprint of a topic is one directory per partition (a topic can
> > have many subpartitions per partitions). Each directory contains one or
> > more files (depending on how much data you are retaining and the segment
> > size, both configurable).
> >
> > In addition to having lots of open files, which certainly scales up to
> the
> > hundreds of thousands, this will also impact the I/O pattern. As the
> number
> > of files increases the data written to each file necessarily decreases.
> > This likely means lots of random I/O. The OS can group together writes,
> but
> > if you only doing a single write per topic every now and then there will
> be
> > nothing to group and you will lots of small random I/O. This will
> > definitely impact throughput. I don't know where the practical limits are
> > we have tested up to ~500 topics and see reasonable performance. We have
> > not done serious performance testing with tens of thousands of topics or
> > more.
> >
> > In addition to the filesystem concerns there is metadata kept for each
> > partition in zk, and I believe zk keeps this metadata in memory.
> >
> > -Jay
> >
> > On Wed, Oct 10, 2012 at 4:12 PM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >
> > > Ok,
> > >
> > > Perhaps for the sake of argument, consider the question if we have
> just 1
> > > kafka broker.  It sounds like it will need to keep a file handle open
> for
> > > each topic?  Is that right?
> > >
> > > Jason
> > >
> > > On Wed, Oct 10, 2012 at 4:05 PM, Neha Narkhede <
> neha.narkhede@gmail.com
> > > >wrote:
> > >
> > > > Hi Jason,
> > > >
> > > > We use option #2 at LinkedIn for metrics and tracking data.
> Supporting
> > > > Option #1 in Kafka 0.7 has its challenges since every topic is stored
> > > > on every broker, by design. Hence, the number of topics a cluster can
> > > > support is limited by the IO and number of open file handles on each
> > > > broker. After Kafka 0.8 is released, the distribution of topics to
> > > > brokers is user defined and can scale out with the number of brokers.
> > > > Having said that, some Kafka users have successfully deployed Kafka
> > > > 0.7 clusters hosting very high number of topics. I hope they can
> share
> > > > their experiences here.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > > On Wed, Oct 10, 2012 at 3:57 PM, Jason Rosenberg <jb...@squareup.com>
> > > wrote:
> > > > > Hi,
> > > > >
> > > > > I'm exploring using kafka for the first time.
> > > > >
> > > > > I'm contemplating a system where we transmit metric data at regular
> > > > > intervals to kafka.  One question I have is whether to generate
> > simple
> > > > > messages with very little meta data (just timestamp and value), and
> > > > keeping
> > > > > meta data like the name/host/app that generated metric out of the
> > > > message,
> > > > > and have that be embodied in the name of the topic itself instead.
> > > > >  Alternatively, we could have a relatively small number of topics,
> > > which
> > > > > contain messages which include source meta data along with the
> > > timestamp
> > > > > and metric value in each message.
> > > > >
> > > > > 1. On one hand, we'd have a large number of topics (say several
> > hundred
> > > > > thousand topics) with small messages, generated at a steady rate
> (say
> > > one
> > > > > every 10 seconds).
> > > > >
> > > > > 2. Alternatively, we could have just few topics, which receive
> > several
> > > > > hundred thousand messages every 10 seconds, which contain 2 or 3
> > times
> > > > more
> > > > > data per message.
> > > > >
> > > > > I'm wondering if kafka has any performance characteristics that
> > differ
> > > > for
> > > > > the 2 scenarios.
> > > > >
> > > > > I like #1 because it simplifies targeted message consumption, and
> > > enables
> > > > > more interesting use of TopicFilter'ing.  But I'm unsure whether
> > there
> > > > > might be performance concerns with kafka (does it have to do more
> > work
> > > to
> > > > > separately manage each topic?).  Is this a common use case, or not?
> > > > >
> > > > > Thanks for any insight.
> > > > >
> > > > > Jason
> > > >
> > >
> >
>

Re: implications of using large number of topics....

Posted by Taylor Gautier <tg...@gmail.com>.
We used pattern #1 at Tagged.  I wouldn't recommend it unless you're really
committed.  It took a lot of work to get it working right.

a) Performance degraded non-linearly (read it fell off a cliff) when
brokers were managing more than about 20k topics.  This was on a Linux RHEL
5.3 system with EXT3.  YMMV.

b) Startup time is significantly longer for a broker that is restarted due
to communication with ZK to sync up on those topics.

c) If topics are short lived, even if Kafka expires the data segments using
it's standard 0.7 cleaner, the directory name for the topic will still
exist on disk and the topic is still considered "active" (in memory) in
Kafka.  This causes problems - see a above (open file handles).

d) Message latency is affected.  Kafka syncs messages to disk if x messages
have buffered in memory, or y seconds have elapsed (both configurable).  If
you have few topics and many messages (pattern #2), you will be hitting the
x limit quite often, and get good throughput.  However, if you have many
topics and few messages per topic (pattern #1), you will have to rely on
the y threshold to flush to disk, and setting this too low can impact
performance (throughput) in a significant way.  Jay already mentioned this
as random writes.

We had to implement a number of solutions ourselves to resolve these
issues, namely:

#1 No ZK.  This means that all of the automatic partitioning done by Kafka
is not available, so we had to roll our own (luckily Tagged is pretty used
to scaling systems so there was much in-house expertise).  The solution
here was to implement a R/W proxy layer of machines to intercept messages
and read/write to/from Kafka handling the sharding at the proxy layer.
 Because most of our messages were coming from PHP and we didn't want to
use TCP we needed a UDP/TCP bridge/proxy anyway so this wasn't a huge deal
(also, we wanted strict ordering of messages, so we needed a shard by topic
feature anyway (I believe this can be done in 0.7 but we started with 0.6)

#2 Custom cleaner.  We implemented an extra cleanup task inside the Kafka
process that could completely remove a topic from memory and disk.  For
clients, this sometimes meant that a subscribed topic suddenly changed it's
physical offset from some offset X to 0, but that's ok, while technically
it probably would never happen theoretically clients should have to handle
this case anyway because the Kafka physical message space is limited to
64-bits (again, unlikely to ever wrap in practice, but you never know).
 Anyway it's pretty easy to handle this just catch the "invalid offset"
error Kafka gives and start at 0.

#3 Low threshold for flush.  This gave us good latency, but poor throughput
(relatively speaking).  We had more than enough throughput, but it was
nowhere near what Kafka can do when setup in pattern #1.

Given that you want to manage "hundreds of thousands of topics" that may
mean that you would need 10's of Kafka brokers which could be another
source of problems - it's more cost, more management, and more sources of
failure.  SSD's may help solve this problem btw, but now you are talking
expensive machines rather than using just off the shelf cheapo servers with
standard SATA drives.

On Wed, Oct 10, 2012 at 4:25 PM, Jay Kreps <ja...@gmail.com> wrote:

> Yes the footprint of a topic is one directory per partition (a topic can
> have many subpartitions per partitions). Each directory contains one or
> more files (depending on how much data you are retaining and the segment
> size, both configurable).
>
> In addition to having lots of open files, which certainly scales up to the
> hundreds of thousands, this will also impact the I/O pattern. As the number
> of files increases the data written to each file necessarily decreases.
> This likely means lots of random I/O. The OS can group together writes, but
> if you only doing a single write per topic every now and then there will be
> nothing to group and you will lots of small random I/O. This will
> definitely impact throughput. I don't know where the practical limits are
> we have tested up to ~500 topics and see reasonable performance. We have
> not done serious performance testing with tens of thousands of topics or
> more.
>
> In addition to the filesystem concerns there is metadata kept for each
> partition in zk, and I believe zk keeps this metadata in memory.
>
> -Jay
>
> On Wed, Oct 10, 2012 at 4:12 PM, Jason Rosenberg <jb...@squareup.com> wrote:
>
> > Ok,
> >
> > Perhaps for the sake of argument, consider the question if we have just 1
> > kafka broker.  It sounds like it will need to keep a file handle open for
> > each topic?  Is that right?
> >
> > Jason
> >
> > On Wed, Oct 10, 2012 at 4:05 PM, Neha Narkhede <neha.narkhede@gmail.com
> > >wrote:
> >
> > > Hi Jason,
> > >
> > > We use option #2 at LinkedIn for metrics and tracking data. Supporting
> > > Option #1 in Kafka 0.7 has its challenges since every topic is stored
> > > on every broker, by design. Hence, the number of topics a cluster can
> > > support is limited by the IO and number of open file handles on each
> > > broker. After Kafka 0.8 is released, the distribution of topics to
> > > brokers is user defined and can scale out with the number of brokers.
> > > Having said that, some Kafka users have successfully deployed Kafka
> > > 0.7 clusters hosting very high number of topics. I hope they can share
> > > their experiences here.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Wed, Oct 10, 2012 at 3:57 PM, Jason Rosenberg <jb...@squareup.com>
> > wrote:
> > > > Hi,
> > > >
> > > > I'm exploring using kafka for the first time.
> > > >
> > > > I'm contemplating a system where we transmit metric data at regular
> > > > intervals to kafka.  One question I have is whether to generate
> simple
> > > > messages with very little meta data (just timestamp and value), and
> > > keeping
> > > > meta data like the name/host/app that generated metric out of the
> > > message,
> > > > and have that be embodied in the name of the topic itself instead.
> > > >  Alternatively, we could have a relatively small number of topics,
> > which
> > > > contain messages which include source meta data along with the
> > timestamp
> > > > and metric value in each message.
> > > >
> > > > 1. On one hand, we'd have a large number of topics (say several
> hundred
> > > > thousand topics) with small messages, generated at a steady rate (say
> > one
> > > > every 10 seconds).
> > > >
> > > > 2. Alternatively, we could have just few topics, which receive
> several
> > > > hundred thousand messages every 10 seconds, which contain 2 or 3
> times
> > > more
> > > > data per message.
> > > >
> > > > I'm wondering if kafka has any performance characteristics that
> differ
> > > for
> > > > the 2 scenarios.
> > > >
> > > > I like #1 because it simplifies targeted message consumption, and
> > enables
> > > > more interesting use of TopicFilter'ing.  But I'm unsure whether
> there
> > > > might be performance concerns with kafka (does it have to do more
> work
> > to
> > > > separately manage each topic?).  Is this a common use case, or not?
> > > >
> > > > Thanks for any insight.
> > > >
> > > > Jason
> > >
> >
>

Re: implications of using large number of topics....

Posted by Jay Kreps <ja...@gmail.com>.
Yes the footprint of a topic is one directory per partition (a topic can
have many subpartitions per partitions). Each directory contains one or
more files (depending on how much data you are retaining and the segment
size, both configurable).

In addition to having lots of open files, which certainly scales up to the
hundreds of thousands, this will also impact the I/O pattern. As the number
of files increases the data written to each file necessarily decreases.
This likely means lots of random I/O. The OS can group together writes, but
if you only doing a single write per topic every now and then there will be
nothing to group and you will lots of small random I/O. This will
definitely impact throughput. I don't know where the practical limits are
we have tested up to ~500 topics and see reasonable performance. We have
not done serious performance testing with tens of thousands of topics or
more.

In addition to the filesystem concerns there is metadata kept for each
partition in zk, and I believe zk keeps this metadata in memory.

-Jay

On Wed, Oct 10, 2012 at 4:12 PM, Jason Rosenberg <jb...@squareup.com> wrote:

> Ok,
>
> Perhaps for the sake of argument, consider the question if we have just 1
> kafka broker.  It sounds like it will need to keep a file handle open for
> each topic?  Is that right?
>
> Jason
>
> On Wed, Oct 10, 2012 at 4:05 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > Hi Jason,
> >
> > We use option #2 at LinkedIn for metrics and tracking data. Supporting
> > Option #1 in Kafka 0.7 has its challenges since every topic is stored
> > on every broker, by design. Hence, the number of topics a cluster can
> > support is limited by the IO and number of open file handles on each
> > broker. After Kafka 0.8 is released, the distribution of topics to
> > brokers is user defined and can scale out with the number of brokers.
> > Having said that, some Kafka users have successfully deployed Kafka
> > 0.7 clusters hosting very high number of topics. I hope they can share
> > their experiences here.
> >
> > Thanks,
> > Neha
> >
> > On Wed, Oct 10, 2012 at 3:57 PM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> > > Hi,
> > >
> > > I'm exploring using kafka for the first time.
> > >
> > > I'm contemplating a system where we transmit metric data at regular
> > > intervals to kafka.  One question I have is whether to generate simple
> > > messages with very little meta data (just timestamp and value), and
> > keeping
> > > meta data like the name/host/app that generated metric out of the
> > message,
> > > and have that be embodied in the name of the topic itself instead.
> > >  Alternatively, we could have a relatively small number of topics,
> which
> > > contain messages which include source meta data along with the
> timestamp
> > > and metric value in each message.
> > >
> > > 1. On one hand, we'd have a large number of topics (say several hundred
> > > thousand topics) with small messages, generated at a steady rate (say
> one
> > > every 10 seconds).
> > >
> > > 2. Alternatively, we could have just few topics, which receive several
> > > hundred thousand messages every 10 seconds, which contain 2 or 3 times
> > more
> > > data per message.
> > >
> > > I'm wondering if kafka has any performance characteristics that differ
> > for
> > > the 2 scenarios.
> > >
> > > I like #1 because it simplifies targeted message consumption, and
> enables
> > > more interesting use of TopicFilter'ing.  But I'm unsure whether there
> > > might be performance concerns with kafka (does it have to do more work
> to
> > > separately manage each topic?).  Is this a common use case, or not?
> > >
> > > Thanks for any insight.
> > >
> > > Jason
> >
>

Re: implications of using large number of topics....

Posted by Jason Rosenberg <jb...@squareup.com>.
Ok,

Perhaps for the sake of argument, consider the question if we have just 1
kafka broker.  It sounds like it will need to keep a file handle open for
each topic?  Is that right?

Jason

On Wed, Oct 10, 2012 at 4:05 PM, Neha Narkhede <ne...@gmail.com>wrote:

> Hi Jason,
>
> We use option #2 at LinkedIn for metrics and tracking data. Supporting
> Option #1 in Kafka 0.7 has its challenges since every topic is stored
> on every broker, by design. Hence, the number of topics a cluster can
> support is limited by the IO and number of open file handles on each
> broker. After Kafka 0.8 is released, the distribution of topics to
> brokers is user defined and can scale out with the number of brokers.
> Having said that, some Kafka users have successfully deployed Kafka
> 0.7 clusters hosting very high number of topics. I hope they can share
> their experiences here.
>
> Thanks,
> Neha
>
> On Wed, Oct 10, 2012 at 3:57 PM, Jason Rosenberg <jb...@squareup.com> wrote:
> > Hi,
> >
> > I'm exploring using kafka for the first time.
> >
> > I'm contemplating a system where we transmit metric data at regular
> > intervals to kafka.  One question I have is whether to generate simple
> > messages with very little meta data (just timestamp and value), and
> keeping
> > meta data like the name/host/app that generated metric out of the
> message,
> > and have that be embodied in the name of the topic itself instead.
> >  Alternatively, we could have a relatively small number of topics, which
> > contain messages which include source meta data along with the timestamp
> > and metric value in each message.
> >
> > 1. On one hand, we'd have a large number of topics (say several hundred
> > thousand topics) with small messages, generated at a steady rate (say one
> > every 10 seconds).
> >
> > 2. Alternatively, we could have just few topics, which receive several
> > hundred thousand messages every 10 seconds, which contain 2 or 3 times
> more
> > data per message.
> >
> > I'm wondering if kafka has any performance characteristics that differ
> for
> > the 2 scenarios.
> >
> > I like #1 because it simplifies targeted message consumption, and enables
> > more interesting use of TopicFilter'ing.  But I'm unsure whether there
> > might be performance concerns with kafka (does it have to do more work to
> > separately manage each topic?).  Is this a common use case, or not?
> >
> > Thanks for any insight.
> >
> > Jason
>

Re: implications of using large number of topics....

Posted by Neha Narkhede <ne...@gmail.com>.
Hi Jason,

We use option #2 at LinkedIn for metrics and tracking data. Supporting
Option #1 in Kafka 0.7 has its challenges since every topic is stored
on every broker, by design. Hence, the number of topics a cluster can
support is limited by the IO and number of open file handles on each
broker. After Kafka 0.8 is released, the distribution of topics to
brokers is user defined and can scale out with the number of brokers.
Having said that, some Kafka users have successfully deployed Kafka
0.7 clusters hosting very high number of topics. I hope they can share
their experiences here.

Thanks,
Neha

On Wed, Oct 10, 2012 at 3:57 PM, Jason Rosenberg <jb...@squareup.com> wrote:
> Hi,
>
> I'm exploring using kafka for the first time.
>
> I'm contemplating a system where we transmit metric data at regular
> intervals to kafka.  One question I have is whether to generate simple
> messages with very little meta data (just timestamp and value), and keeping
> meta data like the name/host/app that generated metric out of the message,
> and have that be embodied in the name of the topic itself instead.
>  Alternatively, we could have a relatively small number of topics, which
> contain messages which include source meta data along with the timestamp
> and metric value in each message.
>
> 1. On one hand, we'd have a large number of topics (say several hundred
> thousand topics) with small messages, generated at a steady rate (say one
> every 10 seconds).
>
> 2. Alternatively, we could have just few topics, which receive several
> hundred thousand messages every 10 seconds, which contain 2 or 3 times more
> data per message.
>
> I'm wondering if kafka has any performance characteristics that differ for
> the 2 scenarios.
>
> I like #1 because it simplifies targeted message consumption, and enables
> more interesting use of TopicFilter'ing.  But I'm unsure whether there
> might be performance concerns with kafka (does it have to do more work to
> separately manage each topic?).  Is this a common use case, or not?
>
> Thanks for any insight.
>
> Jason