You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jay Kreps <ja...@gmail.com> on 2013/07/26 21:00:02 UTC

Client improvement discussion

I sent around a wiki a few weeks back proposing a set of client
improvements that essentially amount to a rewrite of the producer and
consumer java clients.

https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite

The below discussion assumes you have read this wiki.

I started to do a little prototyping for the producer and wanted to share
some of the ideas that came up to get early feedback.

First, a few simple but perhaps controversial things to discuss.

Rollout
Phase 1: We add the new clients. No change on the server. Old clients still
exist. The new clients will be entirely in a new package so there will be
no possibility of name collision.
Phase 2: We swap out all shared code on the server to use the new client
stuff. At this point the old clients still exist but are essentially
deprecated.
Phase 3: We remove the old client code.

Java
I think we should do the clients in java. Making our users deal with
scala's non-compatability issues and crazy stack traces causes people a lot
of pain. Furthermore we end up having to wrap everything now to get a
usable java api anyway for non-scala people. This does mean maintaining a
substantial chunk of java code, which is maybe less fun than scala. But
basically i think we should optimize for the end user and produce a
standalone pure-java jar with no dependencies.

Jars
We definitely want to separate out the client jar. There is also a fair
amount of code shared between both (exceptions, protocol definition, utils,
and the message set implementation). Two approaches.
Two jar approach: split kafka.jar into kafka-clients.jar and
kafka-server.jar with the server depending on the clients. The advantage of
this is that it is simple. The disadvantage is that things like utils and
protocol definition will be in the client jar though technical they belong
equally to the server.
Many jar approach: split kafka.jar into kafka-common.jar,
kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
kafka-server.jar. The disadvantage of this is that the user needs two jars
(common + something) which is for sure going to confuse people. I also
think this will tend to spawn more jars over time.

Background threads
I am thinking of moving both serialization and compression out of the
background send thread. I will explain a little about this idea below.

Serialization
I am not sure if we should handle serialization in the client at all.
Basically I wonder if our own API wouldn't just be a lot simpler if we took
a byte[] key and byte[] value and let people serialize stuff themselves.
Injecting a class name for us to create the serializer is more roundabout
and has a lot of problems if the serializer itself requires a lot of
configuration or other objects to be instantiated.

Partitioning
The real question with serialization is whether the partitioning should
happen on the java object or on the byte array key. The argument for doing
it on the java object is that it is easier to do something like a range
partition on the object. The problem with doing it on the object is that
the consumer may not be in java and so may not be able to reproduce the
partitioning. For example we currently use Object.hashCode which is a
little sketchy. We would be better off doing a standardized hash function
on the key bytes. If we want to give the partitioner access to the original
java object then obviously we need to handle serialization behind our api.

Names
I think good names are important. I would like to rename the following
classes in the new client:
  Message=>Record: Now that the message has both a message and a key it is
more of a KeyedMessage. Another name for a KeyedMessage is a Record.
  MessageSet=>Records: This isn't too important but nit pickers complain
that it is not technically a Set but rather a List or Sequence but
MessageList sounds funny to me.

The actual clients will not interact with these classes. They will interact
with a ProducerRecord and ConsumerRecord. The reason for having different
fields is because the different clients
Proposed producer API:
SendResponse r = producer.send(new ProducerRecord(topic, key, value))

Protocol Definition

Here is what I am thinking about protocol definition. I see a couple of
problems with what we are doing currently. First the protocol definition is
spread throughout a bunch of custom java objects. The error reporting in
these object is really terrible because they don't record the field names.
Furthermore people keep adding business logic into the protocol objects
which is pretty nasty.

I would like to move to having a single Protocol.java file that defines the
protocol in a readable DSL. Here is what I am thinking:

  public static Schema REQUEST_HEADER =

    new Schema(new Field("api_key", INT16, "The id of the request type."),

               new Field("api_version", INT16, "The version of the API."),

                 new Field("correlation_id", INT32, "A user-supplied
integer value that will be passed back with the response"),

                 new Field("client_id", STRING, "A user specified
identifier for the client making the request."));

To parse one of these requests you would do
   Struct struct = REQUEST_HEADER.parse(bytebuffer);
   short apiKey = struct.get("api_key");

Internally Struct is just an Object[] with one entry per field which is
populated from the schema. The mapping of name to array index is a hash
table lookup. We can optimize access for performance critical areas by
allowing:
   static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
this once to lookup the index of the field
   ...
   Struct struct = REQUEST_HEADER.parse(bytebuffer);
   short apiKey = struct.get(apiKeyField); // now this is just an array
access

One advantage of this is this level of indirection will make it really easy
for us to handle backwards compatability in a more principled way. The
protocol file will actually contain ALL versions of the schema and we will
always use the appropriate version to read the request (as specified in the
header).

NIO layer

The plan is to add a non-blocking multi-connection abstraction that would
be used by both clients.

class Selector {
  /* create a new connection and associate it with the given id */
  public void connect(int id, InetSocketAddress address, intsendBufferSize,
int receiveBufferSize)
  /* wakeup this selector if it is currently awaiting data */
  public void wakeup()
  /* user provides sends, recieves, and a timeout. this method will
populate "completed" and "disconnects" lists. Method blocks for up to the
timeout waiting for data to read. */
  public void poll(long timeout, List<Send> sends, List<Send> completed,
List<Receive> receives, List<Integer> disconnects)
}

The consumer and producer would then each define their own logic to manage
their set of in-flight requests.

Producer Implementation

There are a couple of interesting changes I think we can make to the
producer implementation.

We retain the single background "sender" thread.

But we can remove the definition of sync vs async clients. We always return
a "future" response immediately. Both sync and async sends would go through
the buffering that we currently do for the async layer. This would mean
that even in sync mode while the event loop is doing network IO if many
requests accumulate they will be sent in a single batch. This effectively
acts as a kind of "group commit". So instead of having an "async" mode that
acts differently in some way you just have a max.delay time that controls
how long the client will linger waiting for more data to accumulate.
max.delay=0 is equivalent to the current sync producer.

I would also propose changing our buffering strategy. Currently we queue
unserialized requests in a BlockingQueue. This is not ideal as it is very
difficult to reason about the memory usage of this queue. One 5MB message
may be bigger than 10k small messages. I propose that (1) we change our
queuing strategy to queue per-partition and (2) we directly write the
messages to the ByteBuffer which will eventually be sent and use that as
the "queue". The batch size should likewise be in bytes not in number of
messages.

If you think about it our current queuing strategy doesn't really make
sense any more now that we always load balance over brokers. You set a
batch size of N and we do a request when we have N messages in queue but
this says nothing about the size of the requests that will be sent. You
might end up sending all N messages to one server or you might end up
sending 1 message to N different servers (totally defeating the purpose of
batching).

There are two granularities of batching that could make sense: the broker
level or the partition level. We do the send requests at the broker level
but we do the disk IO at the partition level. I propose making the queues
per-partition rather than per broker to avoid having to reshuffle the
contents of queues when leadership changes. This could be debated, though.

If you actually look at the byte path of the producer this approach allows
cleaning a ton of stuff up. We can do in-pace writes to the destination
buffer that we will eventually send. This does mean moving serialization
and compression to the user thread. But I think this is good as these may
be slow but aren't unpredictably slow.

The per-partition queues are thus implemented with a bunch of pre-allocated
ByteBuffers sized to max.batch.size, when the buffer is full or the delay
time elapses that buffer is sent.

By doing this we could actually just reuse the buffers when the send is
complete. This would be nice because since the buffers are used for
allocation they will likely fall out of young gen.

A question to think about is how we want to bound memory usage. I think
what we want is the max.batch.size which controls the size of the
individual buffers and total.buffer.memory which controls the total memory
used by all buffers. One problem with this is that there is the possibility
of some fragmentation. I.e. image a situation with 5k partitions being
produced to, each getting a low but steady message rate. Giving each of
these a 1MB buffer would require 5GB of buffer space to have a buffer for
each partition. I'm not sure how bad this is since at least the memory
usage is predictable and the reality is that holding thousands of java
objects has huge overhead compared to contiguous byte arrays.

-Jay

Re: Client improvement discussion

Posted by Chris Hogue <cs...@gmail.com>.
Hi Jay.

Agreed, we're planning to try to do the compression before the producer
with 0.8 as we get back to this part, hopefully this week. I saw your other
JIRA issue about the copying in the code path so that looks promising as
well.

Thanks for all the responses, really appreciate the effort and 0.8 is
looking like it will work well for us. We'll certainly provide patches as
we have them.

-Chris




On Fri, Aug 2, 2013 at 8:41 PM, Jay Kreps <ja...@gmail.com> wrote:

> Cool. With respect to compression performance, we definitely see the same
> thing, no debate.
>
> Of course if you want to just compress the message payloads you can do that
> now without needing much help from kafka--just pass in the compressed data.
> Whether it not it will do much depends on the size of the message body--for
> small messages you basically need batch compression, but for large messages
> just compressing the body is fine. Our extra effort was to get the better
> compression ratio of compressed messages.
>
> What I was saying about snappy performance is that I think it may be our
> our inefficiency in the compression code-path rather than the underlying
> slowness of snappy. For example on this page
>   https://github.com/dain/snappy
> The compression performance they list for jni (the library we use) tends to
> be around 200MB per core-second, with decompression around 1GB per
> core-second. So on a modern machine with umpteen cores that should not be a
> bottleneck, right? I don't know this to be true but I am wondering if the
> the underlying bottleneck is the compression algorithm or our inefficient
> code. If you look at kafka.message.ByteBufferMessageSet.{create,
> decompress, and assignOffsets} it is pretty inefficient. I did a round of
> improvement there but we are still recopying stuff over and over and
> creating zillions of little buffers and objects. It is a little tricky to
> clean up but probably just a 1-2 day project.
>
> I would rather figure out that it is really the compression that is the
> root cause rather than just our inefficiency before we do anything too
> drastic design wise. If this is really killing you guys, and if that turns
> out to be the cause, we would definitely take a patch to optimize that path
> now.
>
> -Jay
>
>
>
>
> On Fri, Aug 2, 2013 at 4:55 PM, Chris Hogue <cs...@gmail.com> wrote:
>
> > Thanks for the responses. Additional follow-up inline.
> >
> >
> > On Fri, Aug 2, 2013 at 2:21 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Great comments, answers inline!
> > >
> > > On Fri, Aug 2, 2013 at 12:28 PM, Chris Hogue <cs...@gmail.com>
> wrote:
> > >
> > > > These sounds like great steps. A couple of votes and questions:
> > > >
> > > > 1.  Moving serialization out and basing it all off of byte[] for key
> > and
> > > > payload makes sense. Echoing a response below, we've ended up doing
> > that
> > > in
> > > > some cases anyway, and the others do a trivial transform to bytes
> with
> > an
> > > > Encoder.
> > > >
> > >
> > > Cool.
> > >
> > >
> > > > 2. On the single producer thread, we're actually suffering a bit from
> > > this
> > > > in 0.8, but it's mostly because compression and the blocking send
> > happen
> > > on
> > > > this thread. In 0.7 since there was a thread-per-broker, a nice
> > > side-effect
> > > > was that compression and the blocking could "go wide", at least to
> the
> > > > number of brokers. If compression is moved out and the sends are now
> > > > non-blocking then this sounds like a nice improvement.
> > > >
> > >
> > > I think even in 0.7 there was only one thread, right?
> > >
> > >
> > I believe it was actually 1 per broker. Producer.scala iterates the
> brokers
> > and adds a new producer for each. The ProducerPool.addProducer() method
> > adds a new AsyncProducer instance for the broker (assuming async mode),
> and
> > each AsyncProducer creates and starts its own ProducerSendThread.
> >
> > In either case, going to multiplexed I/O and not having the compression
> on
> > this thread probably solves any issue there.
> >
> >
> >
> > >
> > > > 3. The wiki talks about static partition assignment for consumers.
> Just
> > > > adding a vote for that as we're currently working through how to do
> > that
> > > > ourselves with the 0.8 consumer.
> > > >
> > >
> > > Cool, yeah currently you must use the simple consumer to get that which
> > is
> > > a pain.
> > >
> > >
> > > > 4. I'm curious how compression would interact with the new ByteBuffer
> > > > buffering you've described. If I'm following correctly you've said
> that
> > > > rather than queueing objects you'd end up doing in-place writes to
> the
> > > > pre-allocated ByteBuffer. Presumably this means the compression has
> > > already
> > > > happened on the user thread. But if there's no batching/buffering
> > except
> > > in
> > > > the ByteBuffer, is there somewhere that multiple messages will be
> > > > compressed together (since it should result in better compression)?
> > Maybe
> > > > there's still batching before this and I read too much into it?
> > > >
> > >
> > > I'm not 100% sure, but I believe the compression can still be done
> > inline.
> > > The compression algorithm will buffer a bit, of course. What we
> currently
> > > do though is write out the full data uncompressed and then compress it.
> > > This is pretty inefficient. Basically we are using Java's OutputStream
> > apis
> > > for compression but we need to be using the lower-level array oriented
> > > algorithms like (Deflater). I haven't tried this but my assumption is
> > that
> > > we can compress the messages as they arrive into the destination buffer
> > > instead of the current approach.
> > >
> >
> >
> > Right, was starting to think you may be looking at a way of doing the
> > compression incrementally as they come in. Sounds like what you're
> > pursuing.
> >
> >
> >
> > >
> > >
> > > > 5. I don't know if this is quite the right place to discuss it, but
> > since
> > > > the producer has some involvement I'll throw it out there. The
> > > un-compress,
> > > > assign offsets, re-compress that happens on the broker with the
> > built-in
> > > > compression API is a significant bottleneck that we're really trying
> to
> > > > avoid. As noted in another thread, we saw a throughput increase on
> the
> > > > order of 3x when we pre-batched and compressed the payloads before
> > > sending
> > > > it to the producer with 0.8.
> > > >
> > >
> > > Yes, it is a bummer. We think ultimately this does make sense though,
> for
> > > two reasons beyond offsets:
> > > 1. You have to validate the integrity of the data the client has sent
> to
> > > you or else one bad or buggy client can screw up all consumers.
> > > 2. The compression of the log should not be tied to the compression
> used
> > by
> > > individual producers. We haven't made this change yet, but it is an
> easy
> > > one. The problem today is that if your producers send a variety of
> > > compression types your consumers need to handle the union of all types
> > and
> > > you have no guarantee over what types producers may send in the future.
> > > Instead we think these should be decoupled. The topic should have a
> > > compression type property and that should be totally decoupled from the
> > > compression type the producer uses. In many cases there is no real need
> > for
> > > the producer to use compression at all as the real thing you want to
> > > optimize is later inter-datacenter transfers no the network send to the
> > > local broker so the producer can just send uncompressed and have the
> > broker
> > > control the compression type.
> > >
> > > The performance really has two causes though:
> > > 1. GZIP is super slow, especially java's implementation. But snappy,
> for
> > > example, is actually quite fast. We should be able to do snappy at
> > network
> > > speeds according to the perf data I have seen, but...
> > > 2. ...our current compression code is kind of inefficient due to all
> the
> > > copying and traversal, due to the reasons cited above.
> > >
> > > So in other words I think we can make this a bit better but it probably
> > > won't go away. How do you feel about snappy?
> > >
> > >
> > Sorry, I should have been more clear--these tests were all with Snappy
> (the
> > same library Kafka uses, just called directly from our code before it
> went
> > to the producer). I did an early GZIP test and it was just too far out of
> > the ballpark to be useful.
> >
> > I completely understand the architectural separation and the value you're
> > describing here, especially in a general solution where you may have many
> > heterogenous producer and consumer types. In our case it will be pretty
> > homogeneous and throughput is a primary concern, hence the focus on this.
> >
> > I started putting in a description of the benchmarks we did but it's
> going
> > to blow up this thread, so it's probably best if that goes in its own
> > separate thread. The summary is that at an application level, this change
> > alone is the difference between being able to send 25,000 (2.5KB)
> > messages/sec to a single broker vs over 80,000/sec. For comparison, I
> did a
> > small test that simply wrote messages to the kafka logs via the Log class
> > (in a standalone app on that machine, not through a server) and saw
> around
> > ~170,000 messages/sec. The throughput to the disk as reported by iostat
> > reflected a similar change.
> >
> > Obviously without more detail you'll have to take those numbers as a
> rough
> > sketch, and I'm happy to give more detail separately, but that's a high
> > enough cost on the broker that we really think we need to avoid it.
> >
> >
> >
> > > > I've not looked very closely at the wire-protocol, but if there was a
> > way
> > > > for it to support in-place offset assignment even for compressed
> > messages
> > > > it would be a huge win. Short of that we're fine taking the
> > > batch/compress
> > > > responsibility into user code, but it would be nice to have a way to
> do
> > > > that while retaining the built-in partition selection (i.e. semantic
> > > > partitioning) and other functionality of the producer. The new design
> > may
> > > > already be an improvement in this area since it would move some
> > > > responsibility to the user thread.
> > > >
> > >
> > > We can't really do this because we are multi-writer so any offset we
> give
> > > the client would potentially be used by another producer and then be
> > > invalid or non-sequential.
> > >
> >
> > I may have said this in a confusing way. With the tests we did it was
> still
> > the broker assigning offsets, it's just that the message as a whole
> wasn't
> > compressed, only the payloads. So the broker still had plain-bytes access
> > to the headers and went through the optimized code path that exists for
> > non-compressed messages.
> >
> >
> > Really appreciate your responses and glad to see this making progress.
> >
> >
> >
> > >
> > > >
> > > > Not sure if that's clear, but as the interfaces take shape it may be
> > > easier
> > > > to see how that will work.
> > > >
> > > > -Chris
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Jul 26, 2013 at 1:00 PM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > > >
> > > > > I sent around a wiki a few weeks back proposing a set of client
> > > > > improvements that essentially amount to a rewrite of the producer
> and
> > > > > consumer java clients.
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > > > >
> > > > > The below discussion assumes you have read this wiki.
> > > > >
> > > > > I started to do a little prototyping for the producer and wanted to
> > > share
> > > > > some of the ideas that came up to get early feedback.
> > > > >
> > > > > First, a few simple but perhaps controversial things to discuss.
> > > > >
> > > > > Rollout
> > > > > Phase 1: We add the new clients. No change on the server. Old
> clients
> > > > still
> > > > > exist. The new clients will be entirely in a new package so there
> > will
> > > be
> > > > > no possibility of name collision.
> > > > > Phase 2: We swap out all shared code on the server to use the new
> > > client
> > > > > stuff. At this point the old clients still exist but are
> essentially
> > > > > deprecated.
> > > > > Phase 3: We remove the old client code.
> > > > >
> > > > > Java
> > > > > I think we should do the clients in java. Making our users deal
> with
> > > > > scala's non-compatability issues and crazy stack traces causes
> > people a
> > > > lot
> > > > > of pain. Furthermore we end up having to wrap everything now to
> get a
> > > > > usable java api anyway for non-scala people. This does mean
> > > maintaining a
> > > > > substantial chunk of java code, which is maybe less fun than scala.
> > But
> > > > > basically i think we should optimize for the end user and produce a
> > > > > standalone pure-java jar with no dependencies.
> > > > >
> > > > > Jars
> > > > > We definitely want to separate out the client jar. There is also a
> > fair
> > > > > amount of code shared between both (exceptions, protocol
> definition,
> > > > utils,
> > > > > and the message set implementation). Two approaches.
> > > > > Two jar approach: split kafka.jar into kafka-clients.jar and
> > > > > kafka-server.jar with the server depending on the clients. The
> > > advantage
> > > > of
> > > > > this is that it is simple. The disadvantage is that things like
> utils
> > > and
> > > > > protocol definition will be in the client jar though technical they
> > > > belong
> > > > > equally to the server.
> > > > > Many jar approach: split kafka.jar into kafka-common.jar,
> > > > > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> > > > > kafka-server.jar. The disadvantage of this is that the user needs
> two
> > > > jars
> > > > > (common + something) which is for sure going to confuse people. I
> > also
> > > > > think this will tend to spawn more jars over time.
> > > > >
> > > > > Background threads
> > > > > I am thinking of moving both serialization and compression out of
> the
> > > > > background send thread. I will explain a little about this idea
> > below.
> > > > >
> > > > > Serialization
> > > > > I am not sure if we should handle serialization in the client at
> all.
> > > > > Basically I wonder if our own API wouldn't just be a lot simpler if
> > we
> > > > took
> > > > > a byte[] key and byte[] value and let people serialize stuff
> > > themselves.
> > > > > Injecting a class name for us to create the serializer is more
> > > roundabout
> > > > > and has a lot of problems if the serializer itself requires a lot
> of
> > > > > configuration or other objects to be instantiated.
> > > > >
> > > > > Partitioning
> > > > > The real question with serialization is whether the partitioning
> > should
> > > > > happen on the java object or on the byte array key. The argument
> for
> > > > doing
> > > > > it on the java object is that it is easier to do something like a
> > range
> > > > > partition on the object. The problem with doing it on the object is
> > > that
> > > > > the consumer may not be in java and so may not be able to reproduce
> > the
> > > > > partitioning. For example we currently use Object.hashCode which
> is a
> > > > > little sketchy. We would be better off doing a standardized hash
> > > function
> > > > > on the key bytes. If we want to give the partitioner access to the
> > > > original
> > > > > java object then obviously we need to handle serialization behind
> our
> > > > api.
> > > > >
> > > > > Names
> > > > > I think good names are important. I would like to rename the
> > following
> > > > > classes in the new client:
> > > > >   Message=>Record: Now that the message has both a message and a
> key
> > it
> > > > is
> > > > > more of a KeyedMessage. Another name for a KeyedMessage is a
> Record.
> > > > >   MessageSet=>Records: This isn't too important but nit pickers
> > > complain
> > > > > that it is not technically a Set but rather a List or Sequence but
> > > > > MessageList sounds funny to me.
> > > > >
> > > > > The actual clients will not interact with these classes. They will
> > > > interact
> > > > > with a ProducerRecord and ConsumerRecord. The reason for having
> > > different
> > > > > fields is because the different clients
> > > > > Proposed producer API:
> > > > > SendResponse r = producer.send(new ProducerRecord(topic, key,
> value))
> > > > >
> > > > > Protocol Definition
> > > > >
> > > > > Here is what I am thinking about protocol definition. I see a
> couple
> > of
> > > > > problems with what we are doing currently. First the protocol
> > > definition
> > > > is
> > > > > spread throughout a bunch of custom java objects. The error
> reporting
> > > in
> > > > > these object is really terrible because they don't record the field
> > > > names.
> > > > > Furthermore people keep adding business logic into the protocol
> > objects
> > > > > which is pretty nasty.
> > > > >
> > > > > I would like to move to having a single Protocol.java file that
> > defines
> > > > the
> > > > > protocol in a readable DSL. Here is what I am thinking:
> > > > >
> > > > >   public static Schema REQUEST_HEADER =
> > > > >
> > > > >     new Schema(new Field("api_key", INT16, "The id of the request
> > > > type."),
> > > > >
> > > > >                new Field("api_version", INT16, "The version of the
> > > > API."),
> > > > >
> > > > >                  new Field("correlation_id", INT32, "A
> user-supplied
> > > > > integer value that will be passed back with the response"),
> > > > >
> > > > >                  new Field("client_id", STRING, "A user specified
> > > > > identifier for the client making the request."));
> > > > >
> > > > > To parse one of these requests you would do
> > > > >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> > > > >    short apiKey = struct.get("api_key");
> > > > >
> > > > > Internally Struct is just an Object[] with one entry per field
> which
> > is
> > > > > populated from the schema. The mapping of name to array index is a
> > hash
> > > > > table lookup. We can optimize access for performance critical areas
> > by
> > > > > allowing:
> > > > >    static Field apiKeyField = REQUEST_HEADER.getField("api_key");
> //
> > do
> > > > > this once to lookup the index of the field
> > > > >    ...
> > > > >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> > > > >    short apiKey = struct.get(apiKeyField); // now this is just an
> > array
> > > > > access
> > > > >
> > > > > One advantage of this is this level of indirection will make it
> > really
> > > > easy
> > > > > for us to handle backwards compatability in a more principled way.
> > The
> > > > > protocol file will actually contain ALL versions of the schema and
> we
> > > > will
> > > > > always use the appropriate version to read the request (as
> specified
> > in
> > > > the
> > > > > header).
> > > > >
> > > > > NIO layer
> > > > >
> > > > > The plan is to add a non-blocking multi-connection abstraction that
> > > would
> > > > > be used by both clients.
> > > > >
> > > > > class Selector {
> > > > >   /* create a new connection and associate it with the given id */
> > > > >   public void connect(int id, InetSocketAddress address,
> > > > intsendBufferSize,
> > > > > int receiveBufferSize)
> > > > >   /* wakeup this selector if it is currently awaiting data */
> > > > >   public void wakeup()
> > > > >   /* user provides sends, recieves, and a timeout. this method will
> > > > > populate "completed" and "disconnects" lists. Method blocks for up
> to
> > > the
> > > > > timeout waiting for data to read. */
> > > > >   public void poll(long timeout, List<Send> sends, List<Send>
> > > completed,
> > > > > List<Receive> receives, List<Integer> disconnects)
> > > > > }
> > > > >
> > > > > The consumer and producer would then each define their own logic to
> > > > manage
> > > > > their set of in-flight requests.
> > > > >
> > > > > Producer Implementation
> > > > >
> > > > > There are a couple of interesting changes I think we can make to
> the
> > > > > producer implementation.
> > > > >
> > > > > We retain the single background "sender" thread.
> > > > >
> > > > > But we can remove the definition of sync vs async clients. We
> always
> > > > return
> > > > > a "future" response immediately. Both sync and async sends would go
> > > > through
> > > > > the buffering that we currently do for the async layer. This would
> > mean
> > > > > that even in sync mode while the event loop is doing network IO if
> > many
> > > > > requests accumulate they will be sent in a single batch. This
> > > effectively
> > > > > acts as a kind of "group commit". So instead of having an "async"
> > mode
> > > > that
> > > > > acts differently in some way you just have a max.delay time that
> > > controls
> > > > > how long the client will linger waiting for more data to
> accumulate.
> > > > > max.delay=0 is equivalent to the current sync producer.
> > > > >
> > > > > I would also propose changing our buffering strategy. Currently we
> > > queue
> > > > > unserialized requests in a BlockingQueue. This is not ideal as it
> is
> > > very
> > > > > difficult to reason about the memory usage of this queue. One 5MB
> > > message
> > > > > may be bigger than 10k small messages. I propose that (1) we change
> > our
> > > > > queuing strategy to queue per-partition and (2) we directly write
> the
> > > > > messages to the ByteBuffer which will eventually be sent and use
> that
> > > as
> > > > > the "queue". The batch size should likewise be in bytes not in
> number
> > > of
> > > > > messages.
> > > > >
> > > > > If you think about it our current queuing strategy doesn't really
> > make
> > > > > sense any more now that we always load balance over brokers. You
> set
> > a
> > > > > batch size of N and we do a request when we have N messages in
> queue
> > > but
> > > > > this says nothing about the size of the requests that will be sent.
> > You
> > > > > might end up sending all N messages to one server or you might end
> up
> > > > > sending 1 message to N different servers (totally defeating the
> > purpose
> > > > of
> > > > > batching).
> > > > >
> > > > > There are two granularities of batching that could make sense: the
> > > broker
> > > > > level or the partition level. We do the send requests at the broker
> > > level
> > > > > but we do the disk IO at the partition level. I propose making the
> > > queues
> > > > > per-partition rather than per broker to avoid having to reshuffle
> the
> > > > > contents of queues when leadership changes. This could be debated,
> > > > though.
> > > > >
> > > > > If you actually look at the byte path of the producer this approach
> > > > allows
> > > > > cleaning a ton of stuff up. We can do in-pace writes to the
> > destination
> > > > > buffer that we will eventually send. This does mean moving
> > > serialization
> > > > > and compression to the user thread. But I think this is good as
> these
> > > may
> > > > > be slow but aren't unpredictably slow.
> > > > >
> > > > > The per-partition queues are thus implemented with a bunch of
> > > > pre-allocated
> > > > > ByteBuffers sized to max.batch.size, when the buffer is full or the
> > > delay
> > > > > time elapses that buffer is sent.
> > > > >
> > > > > By doing this we could actually just reuse the buffers when the
> send
> > is
> > > > > complete. This would be nice because since the buffers are used for
> > > > > allocation they will likely fall out of young gen.
> > > > >
> > > > > A question to think about is how we want to bound memory usage. I
> > think
> > > > > what we want is the max.batch.size which controls the size of the
> > > > > individual buffers and total.buffer.memory which controls the total
> > > > memory
> > > > > used by all buffers. One problem with this is that there is the
> > > > possibility
> > > > > of some fragmentation. I.e. image a situation with 5k partitions
> > being
> > > > > produced to, each getting a low but steady message rate. Giving
> each
> > of
> > > > > these a 1MB buffer would require 5GB of buffer space to have a
> buffer
> > > for
> > > > > each partition. I'm not sure how bad this is since at least the
> > memory
> > > > > usage is predictable and the reality is that holding thousands of
> > java
> > > > > objects has huge overhead compared to contiguous byte arrays.
> > > > >
> > > > > -Jay
> > > > >
> > > >
> > >
> >
>

Re: Client improvement discussion

Posted by Chris Hogue <cs...@gmail.com>.
Hi Jay.

Agreed, we're planning to try to do the compression before the producer
with 0.8 as we get back to this part, hopefully this week. I saw your other
JIRA issue about the copying in the code path so that looks promising as
well.

Thanks for all the responses, really appreciate the effort and 0.8 is
looking like it will work well for us. We'll certainly provide patches as
we have them.

-Chris




On Fri, Aug 2, 2013 at 8:41 PM, Jay Kreps <ja...@gmail.com> wrote:

> Cool. With respect to compression performance, we definitely see the same
> thing, no debate.
>
> Of course if you want to just compress the message payloads you can do that
> now without needing much help from kafka--just pass in the compressed data.
> Whether it not it will do much depends on the size of the message body--for
> small messages you basically need batch compression, but for large messages
> just compressing the body is fine. Our extra effort was to get the better
> compression ratio of compressed messages.
>
> What I was saying about snappy performance is that I think it may be our
> our inefficiency in the compression code-path rather than the underlying
> slowness of snappy. For example on this page
>   https://github.com/dain/snappy
> The compression performance they list for jni (the library we use) tends to
> be around 200MB per core-second, with decompression around 1GB per
> core-second. So on a modern machine with umpteen cores that should not be a
> bottleneck, right? I don't know this to be true but I am wondering if the
> the underlying bottleneck is the compression algorithm or our inefficient
> code. If you look at kafka.message.ByteBufferMessageSet.{create,
> decompress, and assignOffsets} it is pretty inefficient. I did a round of
> improvement there but we are still recopying stuff over and over and
> creating zillions of little buffers and objects. It is a little tricky to
> clean up but probably just a 1-2 day project.
>
> I would rather figure out that it is really the compression that is the
> root cause rather than just our inefficiency before we do anything too
> drastic design wise. If this is really killing you guys, and if that turns
> out to be the cause, we would definitely take a patch to optimize that path
> now.
>
> -Jay
>
>
>
>
> On Fri, Aug 2, 2013 at 4:55 PM, Chris Hogue <cs...@gmail.com> wrote:
>
> > Thanks for the responses. Additional follow-up inline.
> >
> >
> > On Fri, Aug 2, 2013 at 2:21 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Great comments, answers inline!
> > >
> > > On Fri, Aug 2, 2013 at 12:28 PM, Chris Hogue <cs...@gmail.com>
> wrote:
> > >
> > > > These sounds like great steps. A couple of votes and questions:
> > > >
> > > > 1.  Moving serialization out and basing it all off of byte[] for key
> > and
> > > > payload makes sense. Echoing a response below, we've ended up doing
> > that
> > > in
> > > > some cases anyway, and the others do a trivial transform to bytes
> with
> > an
> > > > Encoder.
> > > >
> > >
> > > Cool.
> > >
> > >
> > > > 2. On the single producer thread, we're actually suffering a bit from
> > > this
> > > > in 0.8, but it's mostly because compression and the blocking send
> > happen
> > > on
> > > > this thread. In 0.7 since there was a thread-per-broker, a nice
> > > side-effect
> > > > was that compression and the blocking could "go wide", at least to
> the
> > > > number of brokers. If compression is moved out and the sends are now
> > > > non-blocking then this sounds like a nice improvement.
> > > >
> > >
> > > I think even in 0.7 there was only one thread, right?
> > >
> > >
> > I believe it was actually 1 per broker. Producer.scala iterates the
> brokers
> > and adds a new producer for each. The ProducerPool.addProducer() method
> > adds a new AsyncProducer instance for the broker (assuming async mode),
> and
> > each AsyncProducer creates and starts its own ProducerSendThread.
> >
> > In either case, going to multiplexed I/O and not having the compression
> on
> > this thread probably solves any issue there.
> >
> >
> >
> > >
> > > > 3. The wiki talks about static partition assignment for consumers.
> Just
> > > > adding a vote for that as we're currently working through how to do
> > that
> > > > ourselves with the 0.8 consumer.
> > > >
> > >
> > > Cool, yeah currently you must use the simple consumer to get that which
> > is
> > > a pain.
> > >
> > >
> > > > 4. I'm curious how compression would interact with the new ByteBuffer
> > > > buffering you've described. If I'm following correctly you've said
> that
> > > > rather than queueing objects you'd end up doing in-place writes to
> the
> > > > pre-allocated ByteBuffer. Presumably this means the compression has
> > > already
> > > > happened on the user thread. But if there's no batching/buffering
> > except
> > > in
> > > > the ByteBuffer, is there somewhere that multiple messages will be
> > > > compressed together (since it should result in better compression)?
> > Maybe
> > > > there's still batching before this and I read too much into it?
> > > >
> > >
> > > I'm not 100% sure, but I believe the compression can still be done
> > inline.
> > > The compression algorithm will buffer a bit, of course. What we
> currently
> > > do though is write out the full data uncompressed and then compress it.
> > > This is pretty inefficient. Basically we are using Java's OutputStream
> > apis
> > > for compression but we need to be using the lower-level array oriented
> > > algorithms like (Deflater). I haven't tried this but my assumption is
> > that
> > > we can compress the messages as they arrive into the destination buffer
> > > instead of the current approach.
> > >
> >
> >
> > Right, was starting to think you may be looking at a way of doing the
> > compression incrementally as they come in. Sounds like what you're
> > pursuing.
> >
> >
> >
> > >
> > >
> > > > 5. I don't know if this is quite the right place to discuss it, but
> > since
> > > > the producer has some involvement I'll throw it out there. The
> > > un-compress,
> > > > assign offsets, re-compress that happens on the broker with the
> > built-in
> > > > compression API is a significant bottleneck that we're really trying
> to
> > > > avoid. As noted in another thread, we saw a throughput increase on
> the
> > > > order of 3x when we pre-batched and compressed the payloads before
> > > sending
> > > > it to the producer with 0.8.
> > > >
> > >
> > > Yes, it is a bummer. We think ultimately this does make sense though,
> for
> > > two reasons beyond offsets:
> > > 1. You have to validate the integrity of the data the client has sent
> to
> > > you or else one bad or buggy client can screw up all consumers.
> > > 2. The compression of the log should not be tied to the compression
> used
> > by
> > > individual producers. We haven't made this change yet, but it is an
> easy
> > > one. The problem today is that if your producers send a variety of
> > > compression types your consumers need to handle the union of all types
> > and
> > > you have no guarantee over what types producers may send in the future.
> > > Instead we think these should be decoupled. The topic should have a
> > > compression type property and that should be totally decoupled from the
> > > compression type the producer uses. In many cases there is no real need
> > for
> > > the producer to use compression at all as the real thing you want to
> > > optimize is later inter-datacenter transfers no the network send to the
> > > local broker so the producer can just send uncompressed and have the
> > broker
> > > control the compression type.
> > >
> > > The performance really has two causes though:
> > > 1. GZIP is super slow, especially java's implementation. But snappy,
> for
> > > example, is actually quite fast. We should be able to do snappy at
> > network
> > > speeds according to the perf data I have seen, but...
> > > 2. ...our current compression code is kind of inefficient due to all
> the
> > > copying and traversal, due to the reasons cited above.
> > >
> > > So in other words I think we can make this a bit better but it probably
> > > won't go away. How do you feel about snappy?
> > >
> > >
> > Sorry, I should have been more clear--these tests were all with Snappy
> (the
> > same library Kafka uses, just called directly from our code before it
> went
> > to the producer). I did an early GZIP test and it was just too far out of
> > the ballpark to be useful.
> >
> > I completely understand the architectural separation and the value you're
> > describing here, especially in a general solution where you may have many
> > heterogenous producer and consumer types. In our case it will be pretty
> > homogeneous and throughput is a primary concern, hence the focus on this.
> >
> > I started putting in a description of the benchmarks we did but it's
> going
> > to blow up this thread, so it's probably best if that goes in its own
> > separate thread. The summary is that at an application level, this change
> > alone is the difference between being able to send 25,000 (2.5KB)
> > messages/sec to a single broker vs over 80,000/sec. For comparison, I
> did a
> > small test that simply wrote messages to the kafka logs via the Log class
> > (in a standalone app on that machine, not through a server) and saw
> around
> > ~170,000 messages/sec. The throughput to the disk as reported by iostat
> > reflected a similar change.
> >
> > Obviously without more detail you'll have to take those numbers as a
> rough
> > sketch, and I'm happy to give more detail separately, but that's a high
> > enough cost on the broker that we really think we need to avoid it.
> >
> >
> >
> > > > I've not looked very closely at the wire-protocol, but if there was a
> > way
> > > > for it to support in-place offset assignment even for compressed
> > messages
> > > > it would be a huge win. Short of that we're fine taking the
> > > batch/compress
> > > > responsibility into user code, but it would be nice to have a way to
> do
> > > > that while retaining the built-in partition selection (i.e. semantic
> > > > partitioning) and other functionality of the producer. The new design
> > may
> > > > already be an improvement in this area since it would move some
> > > > responsibility to the user thread.
> > > >
> > >
> > > We can't really do this because we are multi-writer so any offset we
> give
> > > the client would potentially be used by another producer and then be
> > > invalid or non-sequential.
> > >
> >
> > I may have said this in a confusing way. With the tests we did it was
> still
> > the broker assigning offsets, it's just that the message as a whole
> wasn't
> > compressed, only the payloads. So the broker still had plain-bytes access
> > to the headers and went through the optimized code path that exists for
> > non-compressed messages.
> >
> >
> > Really appreciate your responses and glad to see this making progress.
> >
> >
> >
> > >
> > > >
> > > > Not sure if that's clear, but as the interfaces take shape it may be
> > > easier
> > > > to see how that will work.
> > > >
> > > > -Chris
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Jul 26, 2013 at 1:00 PM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > > >
> > > > > I sent around a wiki a few weeks back proposing a set of client
> > > > > improvements that essentially amount to a rewrite of the producer
> and
> > > > > consumer java clients.
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > > > >
> > > > > The below discussion assumes you have read this wiki.
> > > > >
> > > > > I started to do a little prototyping for the producer and wanted to
> > > share
> > > > > some of the ideas that came up to get early feedback.
> > > > >
> > > > > First, a few simple but perhaps controversial things to discuss.
> > > > >
> > > > > Rollout
> > > > > Phase 1: We add the new clients. No change on the server. Old
> clients
> > > > still
> > > > > exist. The new clients will be entirely in a new package so there
> > will
> > > be
> > > > > no possibility of name collision.
> > > > > Phase 2: We swap out all shared code on the server to use the new
> > > client
> > > > > stuff. At this point the old clients still exist but are
> essentially
> > > > > deprecated.
> > > > > Phase 3: We remove the old client code.
> > > > >
> > > > > Java
> > > > > I think we should do the clients in java. Making our users deal
> with
> > > > > scala's non-compatability issues and crazy stack traces causes
> > people a
> > > > lot
> > > > > of pain. Furthermore we end up having to wrap everything now to
> get a
> > > > > usable java api anyway for non-scala people. This does mean
> > > maintaining a
> > > > > substantial chunk of java code, which is maybe less fun than scala.
> > But
> > > > > basically i think we should optimize for the end user and produce a
> > > > > standalone pure-java jar with no dependencies.
> > > > >
> > > > > Jars
> > > > > We definitely want to separate out the client jar. There is also a
> > fair
> > > > > amount of code shared between both (exceptions, protocol
> definition,
> > > > utils,
> > > > > and the message set implementation). Two approaches.
> > > > > Two jar approach: split kafka.jar into kafka-clients.jar and
> > > > > kafka-server.jar with the server depending on the clients. The
> > > advantage
> > > > of
> > > > > this is that it is simple. The disadvantage is that things like
> utils
> > > and
> > > > > protocol definition will be in the client jar though technical they
> > > > belong
> > > > > equally to the server.
> > > > > Many jar approach: split kafka.jar into kafka-common.jar,
> > > > > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> > > > > kafka-server.jar. The disadvantage of this is that the user needs
> two
> > > > jars
> > > > > (common + something) which is for sure going to confuse people. I
> > also
> > > > > think this will tend to spawn more jars over time.
> > > > >
> > > > > Background threads
> > > > > I am thinking of moving both serialization and compression out of
> the
> > > > > background send thread. I will explain a little about this idea
> > below.
> > > > >
> > > > > Serialization
> > > > > I am not sure if we should handle serialization in the client at
> all.
> > > > > Basically I wonder if our own API wouldn't just be a lot simpler if
> > we
> > > > took
> > > > > a byte[] key and byte[] value and let people serialize stuff
> > > themselves.
> > > > > Injecting a class name for us to create the serializer is more
> > > roundabout
> > > > > and has a lot of problems if the serializer itself requires a lot
> of
> > > > > configuration or other objects to be instantiated.
> > > > >
> > > > > Partitioning
> > > > > The real question with serialization is whether the partitioning
> > should
> > > > > happen on the java object or on the byte array key. The argument
> for
> > > > doing
> > > > > it on the java object is that it is easier to do something like a
> > range
> > > > > partition on the object. The problem with doing it on the object is
> > > that
> > > > > the consumer may not be in java and so may not be able to reproduce
> > the
> > > > > partitioning. For example we currently use Object.hashCode which
> is a
> > > > > little sketchy. We would be better off doing a standardized hash
> > > function
> > > > > on the key bytes. If we want to give the partitioner access to the
> > > > original
> > > > > java object then obviously we need to handle serialization behind
> our
> > > > api.
> > > > >
> > > > > Names
> > > > > I think good names are important. I would like to rename the
> > following
> > > > > classes in the new client:
> > > > >   Message=>Record: Now that the message has both a message and a
> key
> > it
> > > > is
> > > > > more of a KeyedMessage. Another name for a KeyedMessage is a
> Record.
> > > > >   MessageSet=>Records: This isn't too important but nit pickers
> > > complain
> > > > > that it is not technically a Set but rather a List or Sequence but
> > > > > MessageList sounds funny to me.
> > > > >
> > > > > The actual clients will not interact with these classes. They will
> > > > interact
> > > > > with a ProducerRecord and ConsumerRecord. The reason for having
> > > different
> > > > > fields is because the different clients
> > > > > Proposed producer API:
> > > > > SendResponse r = producer.send(new ProducerRecord(topic, key,
> value))
> > > > >
> > > > > Protocol Definition
> > > > >
> > > > > Here is what I am thinking about protocol definition. I see a
> couple
> > of
> > > > > problems with what we are doing currently. First the protocol
> > > definition
> > > > is
> > > > > spread throughout a bunch of custom java objects. The error
> reporting
> > > in
> > > > > these object is really terrible because they don't record the field
> > > > names.
> > > > > Furthermore people keep adding business logic into the protocol
> > objects
> > > > > which is pretty nasty.
> > > > >
> > > > > I would like to move to having a single Protocol.java file that
> > defines
> > > > the
> > > > > protocol in a readable DSL. Here is what I am thinking:
> > > > >
> > > > >   public static Schema REQUEST_HEADER =
> > > > >
> > > > >     new Schema(new Field("api_key", INT16, "The id of the request
> > > > type."),
> > > > >
> > > > >                new Field("api_version", INT16, "The version of the
> > > > API."),
> > > > >
> > > > >                  new Field("correlation_id", INT32, "A
> user-supplied
> > > > > integer value that will be passed back with the response"),
> > > > >
> > > > >                  new Field("client_id", STRING, "A user specified
> > > > > identifier for the client making the request."));
> > > > >
> > > > > To parse one of these requests you would do
> > > > >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> > > > >    short apiKey = struct.get("api_key");
> > > > >
> > > > > Internally Struct is just an Object[] with one entry per field
> which
> > is
> > > > > populated from the schema. The mapping of name to array index is a
> > hash
> > > > > table lookup. We can optimize access for performance critical areas
> > by
> > > > > allowing:
> > > > >    static Field apiKeyField = REQUEST_HEADER.getField("api_key");
> //
> > do
> > > > > this once to lookup the index of the field
> > > > >    ...
> > > > >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> > > > >    short apiKey = struct.get(apiKeyField); // now this is just an
> > array
> > > > > access
> > > > >
> > > > > One advantage of this is this level of indirection will make it
> > really
> > > > easy
> > > > > for us to handle backwards compatability in a more principled way.
> > The
> > > > > protocol file will actually contain ALL versions of the schema and
> we
> > > > will
> > > > > always use the appropriate version to read the request (as
> specified
> > in
> > > > the
> > > > > header).
> > > > >
> > > > > NIO layer
> > > > >
> > > > > The plan is to add a non-blocking multi-connection abstraction that
> > > would
> > > > > be used by both clients.
> > > > >
> > > > > class Selector {
> > > > >   /* create a new connection and associate it with the given id */
> > > > >   public void connect(int id, InetSocketAddress address,
> > > > intsendBufferSize,
> > > > > int receiveBufferSize)
> > > > >   /* wakeup this selector if it is currently awaiting data */
> > > > >   public void wakeup()
> > > > >   /* user provides sends, recieves, and a timeout. this method will
> > > > > populate "completed" and "disconnects" lists. Method blocks for up
> to
> > > the
> > > > > timeout waiting for data to read. */
> > > > >   public void poll(long timeout, List<Send> sends, List<Send>
> > > completed,
> > > > > List<Receive> receives, List<Integer> disconnects)
> > > > > }
> > > > >
> > > > > The consumer and producer would then each define their own logic to
> > > > manage
> > > > > their set of in-flight requests.
> > > > >
> > > > > Producer Implementation
> > > > >
> > > > > There are a couple of interesting changes I think we can make to
> the
> > > > > producer implementation.
> > > > >
> > > > > We retain the single background "sender" thread.
> > > > >
> > > > > But we can remove the definition of sync vs async clients. We
> always
> > > > return
> > > > > a "future" response immediately. Both sync and async sends would go
> > > > through
> > > > > the buffering that we currently do for the async layer. This would
> > mean
> > > > > that even in sync mode while the event loop is doing network IO if
> > many
> > > > > requests accumulate they will be sent in a single batch. This
> > > effectively
> > > > > acts as a kind of "group commit". So instead of having an "async"
> > mode
> > > > that
> > > > > acts differently in some way you just have a max.delay time that
> > > controls
> > > > > how long the client will linger waiting for more data to
> accumulate.
> > > > > max.delay=0 is equivalent to the current sync producer.
> > > > >
> > > > > I would also propose changing our buffering strategy. Currently we
> > > queue
> > > > > unserialized requests in a BlockingQueue. This is not ideal as it
> is
> > > very
> > > > > difficult to reason about the memory usage of this queue. One 5MB
> > > message
> > > > > may be bigger than 10k small messages. I propose that (1) we change
> > our
> > > > > queuing strategy to queue per-partition and (2) we directly write
> the
> > > > > messages to the ByteBuffer which will eventually be sent and use
> that
> > > as
> > > > > the "queue". The batch size should likewise be in bytes not in
> number
> > > of
> > > > > messages.
> > > > >
> > > > > If you think about it our current queuing strategy doesn't really
> > make
> > > > > sense any more now that we always load balance over brokers. You
> set
> > a
> > > > > batch size of N and we do a request when we have N messages in
> queue
> > > but
> > > > > this says nothing about the size of the requests that will be sent.
> > You
> > > > > might end up sending all N messages to one server or you might end
> up
> > > > > sending 1 message to N different servers (totally defeating the
> > purpose
> > > > of
> > > > > batching).
> > > > >
> > > > > There are two granularities of batching that could make sense: the
> > > broker
> > > > > level or the partition level. We do the send requests at the broker
> > > level
> > > > > but we do the disk IO at the partition level. I propose making the
> > > queues
> > > > > per-partition rather than per broker to avoid having to reshuffle
> the
> > > > > contents of queues when leadership changes. This could be debated,
> > > > though.
> > > > >
> > > > > If you actually look at the byte path of the producer this approach
> > > > allows
> > > > > cleaning a ton of stuff up. We can do in-pace writes to the
> > destination
> > > > > buffer that we will eventually send. This does mean moving
> > > serialization
> > > > > and compression to the user thread. But I think this is good as
> these
> > > may
> > > > > be slow but aren't unpredictably slow.
> > > > >
> > > > > The per-partition queues are thus implemented with a bunch of
> > > > pre-allocated
> > > > > ByteBuffers sized to max.batch.size, when the buffer is full or the
> > > delay
> > > > > time elapses that buffer is sent.
> > > > >
> > > > > By doing this we could actually just reuse the buffers when the
> send
> > is
> > > > > complete. This would be nice because since the buffers are used for
> > > > > allocation they will likely fall out of young gen.
> > > > >
> > > > > A question to think about is how we want to bound memory usage. I
> > think
> > > > > what we want is the max.batch.size which controls the size of the
> > > > > individual buffers and total.buffer.memory which controls the total
> > > > memory
> > > > > used by all buffers. One problem with this is that there is the
> > > > possibility
> > > > > of some fragmentation. I.e. image a situation with 5k partitions
> > being
> > > > > produced to, each getting a low but steady message rate. Giving
> each
> > of
> > > > > these a 1MB buffer would require 5GB of buffer space to have a
> buffer
> > > for
> > > > > each partition. I'm not sure how bad this is since at least the
> > memory
> > > > > usage is predictable and the reality is that holding thousands of
> > java
> > > > > objects has huge overhead compared to contiguous byte arrays.
> > > > >
> > > > > -Jay
> > > > >
> > > >
> > >
> >
>

Re: Client improvement discussion

Posted by Jay Kreps <ja...@gmail.com>.
Cool. With respect to compression performance, we definitely see the same
thing, no debate.

Of course if you want to just compress the message payloads you can do that
now without needing much help from kafka--just pass in the compressed data.
Whether it not it will do much depends on the size of the message body--for
small messages you basically need batch compression, but for large messages
just compressing the body is fine. Our extra effort was to get the better
compression ratio of compressed messages.

What I was saying about snappy performance is that I think it may be our
our inefficiency in the compression code-path rather than the underlying
slowness of snappy. For example on this page
  https://github.com/dain/snappy
The compression performance they list for jni (the library we use) tends to
be around 200MB per core-second, with decompression around 1GB per
core-second. So on a modern machine with umpteen cores that should not be a
bottleneck, right? I don't know this to be true but I am wondering if the
the underlying bottleneck is the compression algorithm or our inefficient
code. If you look at kafka.message.ByteBufferMessageSet.{create,
decompress, and assignOffsets} it is pretty inefficient. I did a round of
improvement there but we are still recopying stuff over and over and
creating zillions of little buffers and objects. It is a little tricky to
clean up but probably just a 1-2 day project.

I would rather figure out that it is really the compression that is the
root cause rather than just our inefficiency before we do anything too
drastic design wise. If this is really killing you guys, and if that turns
out to be the cause, we would definitely take a patch to optimize that path
now.

-Jay




On Fri, Aug 2, 2013 at 4:55 PM, Chris Hogue <cs...@gmail.com> wrote:

> Thanks for the responses. Additional follow-up inline.
>
>
> On Fri, Aug 2, 2013 at 2:21 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Great comments, answers inline!
> >
> > On Fri, Aug 2, 2013 at 12:28 PM, Chris Hogue <cs...@gmail.com> wrote:
> >
> > > These sounds like great steps. A couple of votes and questions:
> > >
> > > 1.  Moving serialization out and basing it all off of byte[] for key
> and
> > > payload makes sense. Echoing a response below, we've ended up doing
> that
> > in
> > > some cases anyway, and the others do a trivial transform to bytes with
> an
> > > Encoder.
> > >
> >
> > Cool.
> >
> >
> > > 2. On the single producer thread, we're actually suffering a bit from
> > this
> > > in 0.8, but it's mostly because compression and the blocking send
> happen
> > on
> > > this thread. In 0.7 since there was a thread-per-broker, a nice
> > side-effect
> > > was that compression and the blocking could "go wide", at least to the
> > > number of brokers. If compression is moved out and the sends are now
> > > non-blocking then this sounds like a nice improvement.
> > >
> >
> > I think even in 0.7 there was only one thread, right?
> >
> >
> I believe it was actually 1 per broker. Producer.scala iterates the brokers
> and adds a new producer for each. The ProducerPool.addProducer() method
> adds a new AsyncProducer instance for the broker (assuming async mode), and
> each AsyncProducer creates and starts its own ProducerSendThread.
>
> In either case, going to multiplexed I/O and not having the compression on
> this thread probably solves any issue there.
>
>
>
> >
> > > 3. The wiki talks about static partition assignment for consumers. Just
> > > adding a vote for that as we're currently working through how to do
> that
> > > ourselves with the 0.8 consumer.
> > >
> >
> > Cool, yeah currently you must use the simple consumer to get that which
> is
> > a pain.
> >
> >
> > > 4. I'm curious how compression would interact with the new ByteBuffer
> > > buffering you've described. If I'm following correctly you've said that
> > > rather than queueing objects you'd end up doing in-place writes to the
> > > pre-allocated ByteBuffer. Presumably this means the compression has
> > already
> > > happened on the user thread. But if there's no batching/buffering
> except
> > in
> > > the ByteBuffer, is there somewhere that multiple messages will be
> > > compressed together (since it should result in better compression)?
> Maybe
> > > there's still batching before this and I read too much into it?
> > >
> >
> > I'm not 100% sure, but I believe the compression can still be done
> inline.
> > The compression algorithm will buffer a bit, of course. What we currently
> > do though is write out the full data uncompressed and then compress it.
> > This is pretty inefficient. Basically we are using Java's OutputStream
> apis
> > for compression but we need to be using the lower-level array oriented
> > algorithms like (Deflater). I haven't tried this but my assumption is
> that
> > we can compress the messages as they arrive into the destination buffer
> > instead of the current approach.
> >
>
>
> Right, was starting to think you may be looking at a way of doing the
> compression incrementally as they come in. Sounds like what you're
> pursuing.
>
>
>
> >
> >
> > > 5. I don't know if this is quite the right place to discuss it, but
> since
> > > the producer has some involvement I'll throw it out there. The
> > un-compress,
> > > assign offsets, re-compress that happens on the broker with the
> built-in
> > > compression API is a significant bottleneck that we're really trying to
> > > avoid. As noted in another thread, we saw a throughput increase on the
> > > order of 3x when we pre-batched and compressed the payloads before
> > sending
> > > it to the producer with 0.8.
> > >
> >
> > Yes, it is a bummer. We think ultimately this does make sense though, for
> > two reasons beyond offsets:
> > 1. You have to validate the integrity of the data the client has sent to
> > you or else one bad or buggy client can screw up all consumers.
> > 2. The compression of the log should not be tied to the compression used
> by
> > individual producers. We haven't made this change yet, but it is an easy
> > one. The problem today is that if your producers send a variety of
> > compression types your consumers need to handle the union of all types
> and
> > you have no guarantee over what types producers may send in the future.
> > Instead we think these should be decoupled. The topic should have a
> > compression type property and that should be totally decoupled from the
> > compression type the producer uses. In many cases there is no real need
> for
> > the producer to use compression at all as the real thing you want to
> > optimize is later inter-datacenter transfers no the network send to the
> > local broker so the producer can just send uncompressed and have the
> broker
> > control the compression type.
> >
> > The performance really has two causes though:
> > 1. GZIP is super slow, especially java's implementation. But snappy, for
> > example, is actually quite fast. We should be able to do snappy at
> network
> > speeds according to the perf data I have seen, but...
> > 2. ...our current compression code is kind of inefficient due to all the
> > copying and traversal, due to the reasons cited above.
> >
> > So in other words I think we can make this a bit better but it probably
> > won't go away. How do you feel about snappy?
> >
> >
> Sorry, I should have been more clear--these tests were all with Snappy (the
> same library Kafka uses, just called directly from our code before it went
> to the producer). I did an early GZIP test and it was just too far out of
> the ballpark to be useful.
>
> I completely understand the architectural separation and the value you're
> describing here, especially in a general solution where you may have many
> heterogenous producer and consumer types. In our case it will be pretty
> homogeneous and throughput is a primary concern, hence the focus on this.
>
> I started putting in a description of the benchmarks we did but it's going
> to blow up this thread, so it's probably best if that goes in its own
> separate thread. The summary is that at an application level, this change
> alone is the difference between being able to send 25,000 (2.5KB)
> messages/sec to a single broker vs over 80,000/sec. For comparison, I did a
> small test that simply wrote messages to the kafka logs via the Log class
> (in a standalone app on that machine, not through a server) and saw around
> ~170,000 messages/sec. The throughput to the disk as reported by iostat
> reflected a similar change.
>
> Obviously without more detail you'll have to take those numbers as a rough
> sketch, and I'm happy to give more detail separately, but that's a high
> enough cost on the broker that we really think we need to avoid it.
>
>
>
> > > I've not looked very closely at the wire-protocol, but if there was a
> way
> > > for it to support in-place offset assignment even for compressed
> messages
> > > it would be a huge win. Short of that we're fine taking the
> > batch/compress
> > > responsibility into user code, but it would be nice to have a way to do
> > > that while retaining the built-in partition selection (i.e. semantic
> > > partitioning) and other functionality of the producer. The new design
> may
> > > already be an improvement in this area since it would move some
> > > responsibility to the user thread.
> > >
> >
> > We can't really do this because we are multi-writer so any offset we give
> > the client would potentially be used by another producer and then be
> > invalid or non-sequential.
> >
>
> I may have said this in a confusing way. With the tests we did it was still
> the broker assigning offsets, it's just that the message as a whole wasn't
> compressed, only the payloads. So the broker still had plain-bytes access
> to the headers and went through the optimized code path that exists for
> non-compressed messages.
>
>
> Really appreciate your responses and glad to see this making progress.
>
>
>
> >
> > >
> > > Not sure if that's clear, but as the interfaces take shape it may be
> > easier
> > > to see how that will work.
> > >
> > > -Chris
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Jul 26, 2013 at 1:00 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >
> > > > I sent around a wiki a few weeks back proposing a set of client
> > > > improvements that essentially amount to a rewrite of the producer and
> > > > consumer java clients.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > > >
> > > > The below discussion assumes you have read this wiki.
> > > >
> > > > I started to do a little prototyping for the producer and wanted to
> > share
> > > > some of the ideas that came up to get early feedback.
> > > >
> > > > First, a few simple but perhaps controversial things to discuss.
> > > >
> > > > Rollout
> > > > Phase 1: We add the new clients. No change on the server. Old clients
> > > still
> > > > exist. The new clients will be entirely in a new package so there
> will
> > be
> > > > no possibility of name collision.
> > > > Phase 2: We swap out all shared code on the server to use the new
> > client
> > > > stuff. At this point the old clients still exist but are essentially
> > > > deprecated.
> > > > Phase 3: We remove the old client code.
> > > >
> > > > Java
> > > > I think we should do the clients in java. Making our users deal with
> > > > scala's non-compatability issues and crazy stack traces causes
> people a
> > > lot
> > > > of pain. Furthermore we end up having to wrap everything now to get a
> > > > usable java api anyway for non-scala people. This does mean
> > maintaining a
> > > > substantial chunk of java code, which is maybe less fun than scala.
> But
> > > > basically i think we should optimize for the end user and produce a
> > > > standalone pure-java jar with no dependencies.
> > > >
> > > > Jars
> > > > We definitely want to separate out the client jar. There is also a
> fair
> > > > amount of code shared between both (exceptions, protocol definition,
> > > utils,
> > > > and the message set implementation). Two approaches.
> > > > Two jar approach: split kafka.jar into kafka-clients.jar and
> > > > kafka-server.jar with the server depending on the clients. The
> > advantage
> > > of
> > > > this is that it is simple. The disadvantage is that things like utils
> > and
> > > > protocol definition will be in the client jar though technical they
> > > belong
> > > > equally to the server.
> > > > Many jar approach: split kafka.jar into kafka-common.jar,
> > > > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> > > > kafka-server.jar. The disadvantage of this is that the user needs two
> > > jars
> > > > (common + something) which is for sure going to confuse people. I
> also
> > > > think this will tend to spawn more jars over time.
> > > >
> > > > Background threads
> > > > I am thinking of moving both serialization and compression out of the
> > > > background send thread. I will explain a little about this idea
> below.
> > > >
> > > > Serialization
> > > > I am not sure if we should handle serialization in the client at all.
> > > > Basically I wonder if our own API wouldn't just be a lot simpler if
> we
> > > took
> > > > a byte[] key and byte[] value and let people serialize stuff
> > themselves.
> > > > Injecting a class name for us to create the serializer is more
> > roundabout
> > > > and has a lot of problems if the serializer itself requires a lot of
> > > > configuration or other objects to be instantiated.
> > > >
> > > > Partitioning
> > > > The real question with serialization is whether the partitioning
> should
> > > > happen on the java object or on the byte array key. The argument for
> > > doing
> > > > it on the java object is that it is easier to do something like a
> range
> > > > partition on the object. The problem with doing it on the object is
> > that
> > > > the consumer may not be in java and so may not be able to reproduce
> the
> > > > partitioning. For example we currently use Object.hashCode which is a
> > > > little sketchy. We would be better off doing a standardized hash
> > function
> > > > on the key bytes. If we want to give the partitioner access to the
> > > original
> > > > java object then obviously we need to handle serialization behind our
> > > api.
> > > >
> > > > Names
> > > > I think good names are important. I would like to rename the
> following
> > > > classes in the new client:
> > > >   Message=>Record: Now that the message has both a message and a key
> it
> > > is
> > > > more of a KeyedMessage. Another name for a KeyedMessage is a Record.
> > > >   MessageSet=>Records: This isn't too important but nit pickers
> > complain
> > > > that it is not technically a Set but rather a List or Sequence but
> > > > MessageList sounds funny to me.
> > > >
> > > > The actual clients will not interact with these classes. They will
> > > interact
> > > > with a ProducerRecord and ConsumerRecord. The reason for having
> > different
> > > > fields is because the different clients
> > > > Proposed producer API:
> > > > SendResponse r = producer.send(new ProducerRecord(topic, key, value))
> > > >
> > > > Protocol Definition
> > > >
> > > > Here is what I am thinking about protocol definition. I see a couple
> of
> > > > problems with what we are doing currently. First the protocol
> > definition
> > > is
> > > > spread throughout a bunch of custom java objects. The error reporting
> > in
> > > > these object is really terrible because they don't record the field
> > > names.
> > > > Furthermore people keep adding business logic into the protocol
> objects
> > > > which is pretty nasty.
> > > >
> > > > I would like to move to having a single Protocol.java file that
> defines
> > > the
> > > > protocol in a readable DSL. Here is what I am thinking:
> > > >
> > > >   public static Schema REQUEST_HEADER =
> > > >
> > > >     new Schema(new Field("api_key", INT16, "The id of the request
> > > type."),
> > > >
> > > >                new Field("api_version", INT16, "The version of the
> > > API."),
> > > >
> > > >                  new Field("correlation_id", INT32, "A user-supplied
> > > > integer value that will be passed back with the response"),
> > > >
> > > >                  new Field("client_id", STRING, "A user specified
> > > > identifier for the client making the request."));
> > > >
> > > > To parse one of these requests you would do
> > > >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> > > >    short apiKey = struct.get("api_key");
> > > >
> > > > Internally Struct is just an Object[] with one entry per field which
> is
> > > > populated from the schema. The mapping of name to array index is a
> hash
> > > > table lookup. We can optimize access for performance critical areas
> by
> > > > allowing:
> > > >    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); //
> do
> > > > this once to lookup the index of the field
> > > >    ...
> > > >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> > > >    short apiKey = struct.get(apiKeyField); // now this is just an
> array
> > > > access
> > > >
> > > > One advantage of this is this level of indirection will make it
> really
> > > easy
> > > > for us to handle backwards compatability in a more principled way.
> The
> > > > protocol file will actually contain ALL versions of the schema and we
> > > will
> > > > always use the appropriate version to read the request (as specified
> in
> > > the
> > > > header).
> > > >
> > > > NIO layer
> > > >
> > > > The plan is to add a non-blocking multi-connection abstraction that
> > would
> > > > be used by both clients.
> > > >
> > > > class Selector {
> > > >   /* create a new connection and associate it with the given id */
> > > >   public void connect(int id, InetSocketAddress address,
> > > intsendBufferSize,
> > > > int receiveBufferSize)
> > > >   /* wakeup this selector if it is currently awaiting data */
> > > >   public void wakeup()
> > > >   /* user provides sends, recieves, and a timeout. this method will
> > > > populate "completed" and "disconnects" lists. Method blocks for up to
> > the
> > > > timeout waiting for data to read. */
> > > >   public void poll(long timeout, List<Send> sends, List<Send>
> > completed,
> > > > List<Receive> receives, List<Integer> disconnects)
> > > > }
> > > >
> > > > The consumer and producer would then each define their own logic to
> > > manage
> > > > their set of in-flight requests.
> > > >
> > > > Producer Implementation
> > > >
> > > > There are a couple of interesting changes I think we can make to the
> > > > producer implementation.
> > > >
> > > > We retain the single background "sender" thread.
> > > >
> > > > But we can remove the definition of sync vs async clients. We always
> > > return
> > > > a "future" response immediately. Both sync and async sends would go
> > > through
> > > > the buffering that we currently do for the async layer. This would
> mean
> > > > that even in sync mode while the event loop is doing network IO if
> many
> > > > requests accumulate they will be sent in a single batch. This
> > effectively
> > > > acts as a kind of "group commit". So instead of having an "async"
> mode
> > > that
> > > > acts differently in some way you just have a max.delay time that
> > controls
> > > > how long the client will linger waiting for more data to accumulate.
> > > > max.delay=0 is equivalent to the current sync producer.
> > > >
> > > > I would also propose changing our buffering strategy. Currently we
> > queue
> > > > unserialized requests in a BlockingQueue. This is not ideal as it is
> > very
> > > > difficult to reason about the memory usage of this queue. One 5MB
> > message
> > > > may be bigger than 10k small messages. I propose that (1) we change
> our
> > > > queuing strategy to queue per-partition and (2) we directly write the
> > > > messages to the ByteBuffer which will eventually be sent and use that
> > as
> > > > the "queue". The batch size should likewise be in bytes not in number
> > of
> > > > messages.
> > > >
> > > > If you think about it our current queuing strategy doesn't really
> make
> > > > sense any more now that we always load balance over brokers. You set
> a
> > > > batch size of N and we do a request when we have N messages in queue
> > but
> > > > this says nothing about the size of the requests that will be sent.
> You
> > > > might end up sending all N messages to one server or you might end up
> > > > sending 1 message to N different servers (totally defeating the
> purpose
> > > of
> > > > batching).
> > > >
> > > > There are two granularities of batching that could make sense: the
> > broker
> > > > level or the partition level. We do the send requests at the broker
> > level
> > > > but we do the disk IO at the partition level. I propose making the
> > queues
> > > > per-partition rather than per broker to avoid having to reshuffle the
> > > > contents of queues when leadership changes. This could be debated,
> > > though.
> > > >
> > > > If you actually look at the byte path of the producer this approach
> > > allows
> > > > cleaning a ton of stuff up. We can do in-pace writes to the
> destination
> > > > buffer that we will eventually send. This does mean moving
> > serialization
> > > > and compression to the user thread. But I think this is good as these
> > may
> > > > be slow but aren't unpredictably slow.
> > > >
> > > > The per-partition queues are thus implemented with a bunch of
> > > pre-allocated
> > > > ByteBuffers sized to max.batch.size, when the buffer is full or the
> > delay
> > > > time elapses that buffer is sent.
> > > >
> > > > By doing this we could actually just reuse the buffers when the send
> is
> > > > complete. This would be nice because since the buffers are used for
> > > > allocation they will likely fall out of young gen.
> > > >
> > > > A question to think about is how we want to bound memory usage. I
> think
> > > > what we want is the max.batch.size which controls the size of the
> > > > individual buffers and total.buffer.memory which controls the total
> > > memory
> > > > used by all buffers. One problem with this is that there is the
> > > possibility
> > > > of some fragmentation. I.e. image a situation with 5k partitions
> being
> > > > produced to, each getting a low but steady message rate. Giving each
> of
> > > > these a 1MB buffer would require 5GB of buffer space to have a buffer
> > for
> > > > each partition. I'm not sure how bad this is since at least the
> memory
> > > > usage is predictable and the reality is that holding thousands of
> java
> > > > objects has huge overhead compared to contiguous byte arrays.
> > > >
> > > > -Jay
> > > >
> > >
> >
>

Re: Client improvement discussion

Posted by Jay Kreps <ja...@gmail.com>.
Cool. With respect to compression performance, we definitely see the same
thing, no debate.

Of course if you want to just compress the message payloads you can do that
now without needing much help from kafka--just pass in the compressed data.
Whether it not it will do much depends on the size of the message body--for
small messages you basically need batch compression, but for large messages
just compressing the body is fine. Our extra effort was to get the better
compression ratio of compressed messages.

What I was saying about snappy performance is that I think it may be our
our inefficiency in the compression code-path rather than the underlying
slowness of snappy. For example on this page
  https://github.com/dain/snappy
The compression performance they list for jni (the library we use) tends to
be around 200MB per core-second, with decompression around 1GB per
core-second. So on a modern machine with umpteen cores that should not be a
bottleneck, right? I don't know this to be true but I am wondering if the
the underlying bottleneck is the compression algorithm or our inefficient
code. If you look at kafka.message.ByteBufferMessageSet.{create,
decompress, and assignOffsets} it is pretty inefficient. I did a round of
improvement there but we are still recopying stuff over and over and
creating zillions of little buffers and objects. It is a little tricky to
clean up but probably just a 1-2 day project.

I would rather figure out that it is really the compression that is the
root cause rather than just our inefficiency before we do anything too
drastic design wise. If this is really killing you guys, and if that turns
out to be the cause, we would definitely take a patch to optimize that path
now.

-Jay




On Fri, Aug 2, 2013 at 4:55 PM, Chris Hogue <cs...@gmail.com> wrote:

> Thanks for the responses. Additional follow-up inline.
>
>
> On Fri, Aug 2, 2013 at 2:21 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Great comments, answers inline!
> >
> > On Fri, Aug 2, 2013 at 12:28 PM, Chris Hogue <cs...@gmail.com> wrote:
> >
> > > These sounds like great steps. A couple of votes and questions:
> > >
> > > 1.  Moving serialization out and basing it all off of byte[] for key
> and
> > > payload makes sense. Echoing a response below, we've ended up doing
> that
> > in
> > > some cases anyway, and the others do a trivial transform to bytes with
> an
> > > Encoder.
> > >
> >
> > Cool.
> >
> >
> > > 2. On the single producer thread, we're actually suffering a bit from
> > this
> > > in 0.8, but it's mostly because compression and the blocking send
> happen
> > on
> > > this thread. In 0.7 since there was a thread-per-broker, a nice
> > side-effect
> > > was that compression and the blocking could "go wide", at least to the
> > > number of brokers. If compression is moved out and the sends are now
> > > non-blocking then this sounds like a nice improvement.
> > >
> >
> > I think even in 0.7 there was only one thread, right?
> >
> >
> I believe it was actually 1 per broker. Producer.scala iterates the brokers
> and adds a new producer for each. The ProducerPool.addProducer() method
> adds a new AsyncProducer instance for the broker (assuming async mode), and
> each AsyncProducer creates and starts its own ProducerSendThread.
>
> In either case, going to multiplexed I/O and not having the compression on
> this thread probably solves any issue there.
>
>
>
> >
> > > 3. The wiki talks about static partition assignment for consumers. Just
> > > adding a vote for that as we're currently working through how to do
> that
> > > ourselves with the 0.8 consumer.
> > >
> >
> > Cool, yeah currently you must use the simple consumer to get that which
> is
> > a pain.
> >
> >
> > > 4. I'm curious how compression would interact with the new ByteBuffer
> > > buffering you've described. If I'm following correctly you've said that
> > > rather than queueing objects you'd end up doing in-place writes to the
> > > pre-allocated ByteBuffer. Presumably this means the compression has
> > already
> > > happened on the user thread. But if there's no batching/buffering
> except
> > in
> > > the ByteBuffer, is there somewhere that multiple messages will be
> > > compressed together (since it should result in better compression)?
> Maybe
> > > there's still batching before this and I read too much into it?
> > >
> >
> > I'm not 100% sure, but I believe the compression can still be done
> inline.
> > The compression algorithm will buffer a bit, of course. What we currently
> > do though is write out the full data uncompressed and then compress it.
> > This is pretty inefficient. Basically we are using Java's OutputStream
> apis
> > for compression but we need to be using the lower-level array oriented
> > algorithms like (Deflater). I haven't tried this but my assumption is
> that
> > we can compress the messages as they arrive into the destination buffer
> > instead of the current approach.
> >
>
>
> Right, was starting to think you may be looking at a way of doing the
> compression incrementally as they come in. Sounds like what you're
> pursuing.
>
>
>
> >
> >
> > > 5. I don't know if this is quite the right place to discuss it, but
> since
> > > the producer has some involvement I'll throw it out there. The
> > un-compress,
> > > assign offsets, re-compress that happens on the broker with the
> built-in
> > > compression API is a significant bottleneck that we're really trying to
> > > avoid. As noted in another thread, we saw a throughput increase on the
> > > order of 3x when we pre-batched and compressed the payloads before
> > sending
> > > it to the producer with 0.8.
> > >
> >
> > Yes, it is a bummer. We think ultimately this does make sense though, for
> > two reasons beyond offsets:
> > 1. You have to validate the integrity of the data the client has sent to
> > you or else one bad or buggy client can screw up all consumers.
> > 2. The compression of the log should not be tied to the compression used
> by
> > individual producers. We haven't made this change yet, but it is an easy
> > one. The problem today is that if your producers send a variety of
> > compression types your consumers need to handle the union of all types
> and
> > you have no guarantee over what types producers may send in the future.
> > Instead we think these should be decoupled. The topic should have a
> > compression type property and that should be totally decoupled from the
> > compression type the producer uses. In many cases there is no real need
> for
> > the producer to use compression at all as the real thing you want to
> > optimize is later inter-datacenter transfers no the network send to the
> > local broker so the producer can just send uncompressed and have the
> broker
> > control the compression type.
> >
> > The performance really has two causes though:
> > 1. GZIP is super slow, especially java's implementation. But snappy, for
> > example, is actually quite fast. We should be able to do snappy at
> network
> > speeds according to the perf data I have seen, but...
> > 2. ...our current compression code is kind of inefficient due to all the
> > copying and traversal, due to the reasons cited above.
> >
> > So in other words I think we can make this a bit better but it probably
> > won't go away. How do you feel about snappy?
> >
> >
> Sorry, I should have been more clear--these tests were all with Snappy (the
> same library Kafka uses, just called directly from our code before it went
> to the producer). I did an early GZIP test and it was just too far out of
> the ballpark to be useful.
>
> I completely understand the architectural separation and the value you're
> describing here, especially in a general solution where you may have many
> heterogenous producer and consumer types. In our case it will be pretty
> homogeneous and throughput is a primary concern, hence the focus on this.
>
> I started putting in a description of the benchmarks we did but it's going
> to blow up this thread, so it's probably best if that goes in its own
> separate thread. The summary is that at an application level, this change
> alone is the difference between being able to send 25,000 (2.5KB)
> messages/sec to a single broker vs over 80,000/sec. For comparison, I did a
> small test that simply wrote messages to the kafka logs via the Log class
> (in a standalone app on that machine, not through a server) and saw around
> ~170,000 messages/sec. The throughput to the disk as reported by iostat
> reflected a similar change.
>
> Obviously without more detail you'll have to take those numbers as a rough
> sketch, and I'm happy to give more detail separately, but that's a high
> enough cost on the broker that we really think we need to avoid it.
>
>
>
> > > I've not looked very closely at the wire-protocol, but if there was a
> way
> > > for it to support in-place offset assignment even for compressed
> messages
> > > it would be a huge win. Short of that we're fine taking the
> > batch/compress
> > > responsibility into user code, but it would be nice to have a way to do
> > > that while retaining the built-in partition selection (i.e. semantic
> > > partitioning) and other functionality of the producer. The new design
> may
> > > already be an improvement in this area since it would move some
> > > responsibility to the user thread.
> > >
> >
> > We can't really do this because we are multi-writer so any offset we give
> > the client would potentially be used by another producer and then be
> > invalid or non-sequential.
> >
>
> I may have said this in a confusing way. With the tests we did it was still
> the broker assigning offsets, it's just that the message as a whole wasn't
> compressed, only the payloads. So the broker still had plain-bytes access
> to the headers and went through the optimized code path that exists for
> non-compressed messages.
>
>
> Really appreciate your responses and glad to see this making progress.
>
>
>
> >
> > >
> > > Not sure if that's clear, but as the interfaces take shape it may be
> > easier
> > > to see how that will work.
> > >
> > > -Chris
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Jul 26, 2013 at 1:00 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >
> > > > I sent around a wiki a few weeks back proposing a set of client
> > > > improvements that essentially amount to a rewrite of the producer and
> > > > consumer java clients.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > > >
> > > > The below discussion assumes you have read this wiki.
> > > >
> > > > I started to do a little prototyping for the producer and wanted to
> > share
> > > > some of the ideas that came up to get early feedback.
> > > >
> > > > First, a few simple but perhaps controversial things to discuss.
> > > >
> > > > Rollout
> > > > Phase 1: We add the new clients. No change on the server. Old clients
> > > still
> > > > exist. The new clients will be entirely in a new package so there
> will
> > be
> > > > no possibility of name collision.
> > > > Phase 2: We swap out all shared code on the server to use the new
> > client
> > > > stuff. At this point the old clients still exist but are essentially
> > > > deprecated.
> > > > Phase 3: We remove the old client code.
> > > >
> > > > Java
> > > > I think we should do the clients in java. Making our users deal with
> > > > scala's non-compatability issues and crazy stack traces causes
> people a
> > > lot
> > > > of pain. Furthermore we end up having to wrap everything now to get a
> > > > usable java api anyway for non-scala people. This does mean
> > maintaining a
> > > > substantial chunk of java code, which is maybe less fun than scala.
> But
> > > > basically i think we should optimize for the end user and produce a
> > > > standalone pure-java jar with no dependencies.
> > > >
> > > > Jars
> > > > We definitely want to separate out the client jar. There is also a
> fair
> > > > amount of code shared between both (exceptions, protocol definition,
> > > utils,
> > > > and the message set implementation). Two approaches.
> > > > Two jar approach: split kafka.jar into kafka-clients.jar and
> > > > kafka-server.jar with the server depending on the clients. The
> > advantage
> > > of
> > > > this is that it is simple. The disadvantage is that things like utils
> > and
> > > > protocol definition will be in the client jar though technical they
> > > belong
> > > > equally to the server.
> > > > Many jar approach: split kafka.jar into kafka-common.jar,
> > > > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> > > > kafka-server.jar. The disadvantage of this is that the user needs two
> > > jars
> > > > (common + something) which is for sure going to confuse people. I
> also
> > > > think this will tend to spawn more jars over time.
> > > >
> > > > Background threads
> > > > I am thinking of moving both serialization and compression out of the
> > > > background send thread. I will explain a little about this idea
> below.
> > > >
> > > > Serialization
> > > > I am not sure if we should handle serialization in the client at all.
> > > > Basically I wonder if our own API wouldn't just be a lot simpler if
> we
> > > took
> > > > a byte[] key and byte[] value and let people serialize stuff
> > themselves.
> > > > Injecting a class name for us to create the serializer is more
> > roundabout
> > > > and has a lot of problems if the serializer itself requires a lot of
> > > > configuration or other objects to be instantiated.
> > > >
> > > > Partitioning
> > > > The real question with serialization is whether the partitioning
> should
> > > > happen on the java object or on the byte array key. The argument for
> > > doing
> > > > it on the java object is that it is easier to do something like a
> range
> > > > partition on the object. The problem with doing it on the object is
> > that
> > > > the consumer may not be in java and so may not be able to reproduce
> the
> > > > partitioning. For example we currently use Object.hashCode which is a
> > > > little sketchy. We would be better off doing a standardized hash
> > function
> > > > on the key bytes. If we want to give the partitioner access to the
> > > original
> > > > java object then obviously we need to handle serialization behind our
> > > api.
> > > >
> > > > Names
> > > > I think good names are important. I would like to rename the
> following
> > > > classes in the new client:
> > > >   Message=>Record: Now that the message has both a message and a key
> it
> > > is
> > > > more of a KeyedMessage. Another name for a KeyedMessage is a Record.
> > > >   MessageSet=>Records: This isn't too important but nit pickers
> > complain
> > > > that it is not technically a Set but rather a List or Sequence but
> > > > MessageList sounds funny to me.
> > > >
> > > > The actual clients will not interact with these classes. They will
> > > interact
> > > > with a ProducerRecord and ConsumerRecord. The reason for having
> > different
> > > > fields is because the different clients
> > > > Proposed producer API:
> > > > SendResponse r = producer.send(new ProducerRecord(topic, key, value))
> > > >
> > > > Protocol Definition
> > > >
> > > > Here is what I am thinking about protocol definition. I see a couple
> of
> > > > problems with what we are doing currently. First the protocol
> > definition
> > > is
> > > > spread throughout a bunch of custom java objects. The error reporting
> > in
> > > > these object is really terrible because they don't record the field
> > > names.
> > > > Furthermore people keep adding business logic into the protocol
> objects
> > > > which is pretty nasty.
> > > >
> > > > I would like to move to having a single Protocol.java file that
> defines
> > > the
> > > > protocol in a readable DSL. Here is what I am thinking:
> > > >
> > > >   public static Schema REQUEST_HEADER =
> > > >
> > > >     new Schema(new Field("api_key", INT16, "The id of the request
> > > type."),
> > > >
> > > >                new Field("api_version", INT16, "The version of the
> > > API."),
> > > >
> > > >                  new Field("correlation_id", INT32, "A user-supplied
> > > > integer value that will be passed back with the response"),
> > > >
> > > >                  new Field("client_id", STRING, "A user specified
> > > > identifier for the client making the request."));
> > > >
> > > > To parse one of these requests you would do
> > > >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> > > >    short apiKey = struct.get("api_key");
> > > >
> > > > Internally Struct is just an Object[] with one entry per field which
> is
> > > > populated from the schema. The mapping of name to array index is a
> hash
> > > > table lookup. We can optimize access for performance critical areas
> by
> > > > allowing:
> > > >    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); //
> do
> > > > this once to lookup the index of the field
> > > >    ...
> > > >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> > > >    short apiKey = struct.get(apiKeyField); // now this is just an
> array
> > > > access
> > > >
> > > > One advantage of this is this level of indirection will make it
> really
> > > easy
> > > > for us to handle backwards compatability in a more principled way.
> The
> > > > protocol file will actually contain ALL versions of the schema and we
> > > will
> > > > always use the appropriate version to read the request (as specified
> in
> > > the
> > > > header).
> > > >
> > > > NIO layer
> > > >
> > > > The plan is to add a non-blocking multi-connection abstraction that
> > would
> > > > be used by both clients.
> > > >
> > > > class Selector {
> > > >   /* create a new connection and associate it with the given id */
> > > >   public void connect(int id, InetSocketAddress address,
> > > intsendBufferSize,
> > > > int receiveBufferSize)
> > > >   /* wakeup this selector if it is currently awaiting data */
> > > >   public void wakeup()
> > > >   /* user provides sends, recieves, and a timeout. this method will
> > > > populate "completed" and "disconnects" lists. Method blocks for up to
> > the
> > > > timeout waiting for data to read. */
> > > >   public void poll(long timeout, List<Send> sends, List<Send>
> > completed,
> > > > List<Receive> receives, List<Integer> disconnects)
> > > > }
> > > >
> > > > The consumer and producer would then each define their own logic to
> > > manage
> > > > their set of in-flight requests.
> > > >
> > > > Producer Implementation
> > > >
> > > > There are a couple of interesting changes I think we can make to the
> > > > producer implementation.
> > > >
> > > > We retain the single background "sender" thread.
> > > >
> > > > But we can remove the definition of sync vs async clients. We always
> > > return
> > > > a "future" response immediately. Both sync and async sends would go
> > > through
> > > > the buffering that we currently do for the async layer. This would
> mean
> > > > that even in sync mode while the event loop is doing network IO if
> many
> > > > requests accumulate they will be sent in a single batch. This
> > effectively
> > > > acts as a kind of "group commit". So instead of having an "async"
> mode
> > > that
> > > > acts differently in some way you just have a max.delay time that
> > controls
> > > > how long the client will linger waiting for more data to accumulate.
> > > > max.delay=0 is equivalent to the current sync producer.
> > > >
> > > > I would also propose changing our buffering strategy. Currently we
> > queue
> > > > unserialized requests in a BlockingQueue. This is not ideal as it is
> > very
> > > > difficult to reason about the memory usage of this queue. One 5MB
> > message
> > > > may be bigger than 10k small messages. I propose that (1) we change
> our
> > > > queuing strategy to queue per-partition and (2) we directly write the
> > > > messages to the ByteBuffer which will eventually be sent and use that
> > as
> > > > the "queue". The batch size should likewise be in bytes not in number
> > of
> > > > messages.
> > > >
> > > > If you think about it our current queuing strategy doesn't really
> make
> > > > sense any more now that we always load balance over brokers. You set
> a
> > > > batch size of N and we do a request when we have N messages in queue
> > but
> > > > this says nothing about the size of the requests that will be sent.
> You
> > > > might end up sending all N messages to one server or you might end up
> > > > sending 1 message to N different servers (totally defeating the
> purpose
> > > of
> > > > batching).
> > > >
> > > > There are two granularities of batching that could make sense: the
> > broker
> > > > level or the partition level. We do the send requests at the broker
> > level
> > > > but we do the disk IO at the partition level. I propose making the
> > queues
> > > > per-partition rather than per broker to avoid having to reshuffle the
> > > > contents of queues when leadership changes. This could be debated,
> > > though.
> > > >
> > > > If you actually look at the byte path of the producer this approach
> > > allows
> > > > cleaning a ton of stuff up. We can do in-pace writes to the
> destination
> > > > buffer that we will eventually send. This does mean moving
> > serialization
> > > > and compression to the user thread. But I think this is good as these
> > may
> > > > be slow but aren't unpredictably slow.
> > > >
> > > > The per-partition queues are thus implemented with a bunch of
> > > pre-allocated
> > > > ByteBuffers sized to max.batch.size, when the buffer is full or the
> > delay
> > > > time elapses that buffer is sent.
> > > >
> > > > By doing this we could actually just reuse the buffers when the send
> is
> > > > complete. This would be nice because since the buffers are used for
> > > > allocation they will likely fall out of young gen.
> > > >
> > > > A question to think about is how we want to bound memory usage. I
> think
> > > > what we want is the max.batch.size which controls the size of the
> > > > individual buffers and total.buffer.memory which controls the total
> > > memory
> > > > used by all buffers. One problem with this is that there is the
> > > possibility
> > > > of some fragmentation. I.e. image a situation with 5k partitions
> being
> > > > produced to, each getting a low but steady message rate. Giving each
> of
> > > > these a 1MB buffer would require 5GB of buffer space to have a buffer
> > for
> > > > each partition. I'm not sure how bad this is since at least the
> memory
> > > > usage is predictable and the reality is that holding thousands of
> java
> > > > objects has huge overhead compared to contiguous byte arrays.
> > > >
> > > > -Jay
> > > >
> > >
> >
>

Re: Client improvement discussion

Posted by Chris Hogue <cs...@gmail.com>.
Thanks for the responses. Additional follow-up inline.


On Fri, Aug 2, 2013 at 2:21 PM, Jay Kreps <ja...@gmail.com> wrote:

> Great comments, answers inline!
>
> On Fri, Aug 2, 2013 at 12:28 PM, Chris Hogue <cs...@gmail.com> wrote:
>
> > These sounds like great steps. A couple of votes and questions:
> >
> > 1.  Moving serialization out and basing it all off of byte[] for key and
> > payload makes sense. Echoing a response below, we've ended up doing that
> in
> > some cases anyway, and the others do a trivial transform to bytes with an
> > Encoder.
> >
>
> Cool.
>
>
> > 2. On the single producer thread, we're actually suffering a bit from
> this
> > in 0.8, but it's mostly because compression and the blocking send happen
> on
> > this thread. In 0.7 since there was a thread-per-broker, a nice
> side-effect
> > was that compression and the blocking could "go wide", at least to the
> > number of brokers. If compression is moved out and the sends are now
> > non-blocking then this sounds like a nice improvement.
> >
>
> I think even in 0.7 there was only one thread, right?
>
>
I believe it was actually 1 per broker. Producer.scala iterates the brokers
and adds a new producer for each. The ProducerPool.addProducer() method
adds a new AsyncProducer instance for the broker (assuming async mode), and
each AsyncProducer creates and starts its own ProducerSendThread.

In either case, going to multiplexed I/O and not having the compression on
this thread probably solves any issue there.



>
> > 3. The wiki talks about static partition assignment for consumers. Just
> > adding a vote for that as we're currently working through how to do that
> > ourselves with the 0.8 consumer.
> >
>
> Cool, yeah currently you must use the simple consumer to get that which is
> a pain.
>
>
> > 4. I'm curious how compression would interact with the new ByteBuffer
> > buffering you've described. If I'm following correctly you've said that
> > rather than queueing objects you'd end up doing in-place writes to the
> > pre-allocated ByteBuffer. Presumably this means the compression has
> already
> > happened on the user thread. But if there's no batching/buffering except
> in
> > the ByteBuffer, is there somewhere that multiple messages will be
> > compressed together (since it should result in better compression)? Maybe
> > there's still batching before this and I read too much into it?
> >
>
> I'm not 100% sure, but I believe the compression can still be done inline.
> The compression algorithm will buffer a bit, of course. What we currently
> do though is write out the full data uncompressed and then compress it.
> This is pretty inefficient. Basically we are using Java's OutputStream apis
> for compression but we need to be using the lower-level array oriented
> algorithms like (Deflater). I haven't tried this but my assumption is that
> we can compress the messages as they arrive into the destination buffer
> instead of the current approach.
>


Right, was starting to think you may be looking at a way of doing the
compression incrementally as they come in. Sounds like what you're pursuing.



>
>
> > 5. I don't know if this is quite the right place to discuss it, but since
> > the producer has some involvement I'll throw it out there. The
> un-compress,
> > assign offsets, re-compress that happens on the broker with the built-in
> > compression API is a significant bottleneck that we're really trying to
> > avoid. As noted in another thread, we saw a throughput increase on the
> > order of 3x when we pre-batched and compressed the payloads before
> sending
> > it to the producer with 0.8.
> >
>
> Yes, it is a bummer. We think ultimately this does make sense though, for
> two reasons beyond offsets:
> 1. You have to validate the integrity of the data the client has sent to
> you or else one bad or buggy client can screw up all consumers.
> 2. The compression of the log should not be tied to the compression used by
> individual producers. We haven't made this change yet, but it is an easy
> one. The problem today is that if your producers send a variety of
> compression types your consumers need to handle the union of all types and
> you have no guarantee over what types producers may send in the future.
> Instead we think these should be decoupled. The topic should have a
> compression type property and that should be totally decoupled from the
> compression type the producer uses. In many cases there is no real need for
> the producer to use compression at all as the real thing you want to
> optimize is later inter-datacenter transfers no the network send to the
> local broker so the producer can just send uncompressed and have the broker
> control the compression type.
>
> The performance really has two causes though:
> 1. GZIP is super slow, especially java's implementation. But snappy, for
> example, is actually quite fast. We should be able to do snappy at network
> speeds according to the perf data I have seen, but...
> 2. ...our current compression code is kind of inefficient due to all the
> copying and traversal, due to the reasons cited above.
>
> So in other words I think we can make this a bit better but it probably
> won't go away. How do you feel about snappy?
>
>
Sorry, I should have been more clear--these tests were all with Snappy (the
same library Kafka uses, just called directly from our code before it went
to the producer). I did an early GZIP test and it was just too far out of
the ballpark to be useful.

I completely understand the architectural separation and the value you're
describing here, especially in a general solution where you may have many
heterogenous producer and consumer types. In our case it will be pretty
homogeneous and throughput is a primary concern, hence the focus on this.

I started putting in a description of the benchmarks we did but it's going
to blow up this thread, so it's probably best if that goes in its own
separate thread. The summary is that at an application level, this change
alone is the difference between being able to send 25,000 (2.5KB)
messages/sec to a single broker vs over 80,000/sec. For comparison, I did a
small test that simply wrote messages to the kafka logs via the Log class
(in a standalone app on that machine, not through a server) and saw around
~170,000 messages/sec. The throughput to the disk as reported by iostat
reflected a similar change.

Obviously without more detail you'll have to take those numbers as a rough
sketch, and I'm happy to give more detail separately, but that's a high
enough cost on the broker that we really think we need to avoid it.



> > I've not looked very closely at the wire-protocol, but if there was a way
> > for it to support in-place offset assignment even for compressed messages
> > it would be a huge win. Short of that we're fine taking the
> batch/compress
> > responsibility into user code, but it would be nice to have a way to do
> > that while retaining the built-in partition selection (i.e. semantic
> > partitioning) and other functionality of the producer. The new design may
> > already be an improvement in this area since it would move some
> > responsibility to the user thread.
> >
>
> We can't really do this because we are multi-writer so any offset we give
> the client would potentially be used by another producer and then be
> invalid or non-sequential.
>

I may have said this in a confusing way. With the tests we did it was still
the broker assigning offsets, it's just that the message as a whole wasn't
compressed, only the payloads. So the broker still had plain-bytes access
to the headers and went through the optimized code path that exists for
non-compressed messages.


Really appreciate your responses and glad to see this making progress.



>
> >
> > Not sure if that's clear, but as the interfaces take shape it may be
> easier
> > to see how that will work.
> >
> > -Chris
> >
> >
> >
> >
> >
> >
> > On Fri, Jul 26, 2013 at 1:00 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > I sent around a wiki a few weeks back proposing a set of client
> > > improvements that essentially amount to a rewrite of the producer and
> > > consumer java clients.
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > >
> > > The below discussion assumes you have read this wiki.
> > >
> > > I started to do a little prototyping for the producer and wanted to
> share
> > > some of the ideas that came up to get early feedback.
> > >
> > > First, a few simple but perhaps controversial things to discuss.
> > >
> > > Rollout
> > > Phase 1: We add the new clients. No change on the server. Old clients
> > still
> > > exist. The new clients will be entirely in a new package so there will
> be
> > > no possibility of name collision.
> > > Phase 2: We swap out all shared code on the server to use the new
> client
> > > stuff. At this point the old clients still exist but are essentially
> > > deprecated.
> > > Phase 3: We remove the old client code.
> > >
> > > Java
> > > I think we should do the clients in java. Making our users deal with
> > > scala's non-compatability issues and crazy stack traces causes people a
> > lot
> > > of pain. Furthermore we end up having to wrap everything now to get a
> > > usable java api anyway for non-scala people. This does mean
> maintaining a
> > > substantial chunk of java code, which is maybe less fun than scala. But
> > > basically i think we should optimize for the end user and produce a
> > > standalone pure-java jar with no dependencies.
> > >
> > > Jars
> > > We definitely want to separate out the client jar. There is also a fair
> > > amount of code shared between both (exceptions, protocol definition,
> > utils,
> > > and the message set implementation). Two approaches.
> > > Two jar approach: split kafka.jar into kafka-clients.jar and
> > > kafka-server.jar with the server depending on the clients. The
> advantage
> > of
> > > this is that it is simple. The disadvantage is that things like utils
> and
> > > protocol definition will be in the client jar though technical they
> > belong
> > > equally to the server.
> > > Many jar approach: split kafka.jar into kafka-common.jar,
> > > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> > > kafka-server.jar. The disadvantage of this is that the user needs two
> > jars
> > > (common + something) which is for sure going to confuse people. I also
> > > think this will tend to spawn more jars over time.
> > >
> > > Background threads
> > > I am thinking of moving both serialization and compression out of the
> > > background send thread. I will explain a little about this idea below.
> > >
> > > Serialization
> > > I am not sure if we should handle serialization in the client at all.
> > > Basically I wonder if our own API wouldn't just be a lot simpler if we
> > took
> > > a byte[] key and byte[] value and let people serialize stuff
> themselves.
> > > Injecting a class name for us to create the serializer is more
> roundabout
> > > and has a lot of problems if the serializer itself requires a lot of
> > > configuration or other objects to be instantiated.
> > >
> > > Partitioning
> > > The real question with serialization is whether the partitioning should
> > > happen on the java object or on the byte array key. The argument for
> > doing
> > > it on the java object is that it is easier to do something like a range
> > > partition on the object. The problem with doing it on the object is
> that
> > > the consumer may not be in java and so may not be able to reproduce the
> > > partitioning. For example we currently use Object.hashCode which is a
> > > little sketchy. We would be better off doing a standardized hash
> function
> > > on the key bytes. If we want to give the partitioner access to the
> > original
> > > java object then obviously we need to handle serialization behind our
> > api.
> > >
> > > Names
> > > I think good names are important. I would like to rename the following
> > > classes in the new client:
> > >   Message=>Record: Now that the message has both a message and a key it
> > is
> > > more of a KeyedMessage. Another name for a KeyedMessage is a Record.
> > >   MessageSet=>Records: This isn't too important but nit pickers
> complain
> > > that it is not technically a Set but rather a List or Sequence but
> > > MessageList sounds funny to me.
> > >
> > > The actual clients will not interact with these classes. They will
> > interact
> > > with a ProducerRecord and ConsumerRecord. The reason for having
> different
> > > fields is because the different clients
> > > Proposed producer API:
> > > SendResponse r = producer.send(new ProducerRecord(topic, key, value))
> > >
> > > Protocol Definition
> > >
> > > Here is what I am thinking about protocol definition. I see a couple of
> > > problems with what we are doing currently. First the protocol
> definition
> > is
> > > spread throughout a bunch of custom java objects. The error reporting
> in
> > > these object is really terrible because they don't record the field
> > names.
> > > Furthermore people keep adding business logic into the protocol objects
> > > which is pretty nasty.
> > >
> > > I would like to move to having a single Protocol.java file that defines
> > the
> > > protocol in a readable DSL. Here is what I am thinking:
> > >
> > >   public static Schema REQUEST_HEADER =
> > >
> > >     new Schema(new Field("api_key", INT16, "The id of the request
> > type."),
> > >
> > >                new Field("api_version", INT16, "The version of the
> > API."),
> > >
> > >                  new Field("correlation_id", INT32, "A user-supplied
> > > integer value that will be passed back with the response"),
> > >
> > >                  new Field("client_id", STRING, "A user specified
> > > identifier for the client making the request."));
> > >
> > > To parse one of these requests you would do
> > >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> > >    short apiKey = struct.get("api_key");
> > >
> > > Internally Struct is just an Object[] with one entry per field which is
> > > populated from the schema. The mapping of name to array index is a hash
> > > table lookup. We can optimize access for performance critical areas by
> > > allowing:
> > >    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> > > this once to lookup the index of the field
> > >    ...
> > >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> > >    short apiKey = struct.get(apiKeyField); // now this is just an array
> > > access
> > >
> > > One advantage of this is this level of indirection will make it really
> > easy
> > > for us to handle backwards compatability in a more principled way. The
> > > protocol file will actually contain ALL versions of the schema and we
> > will
> > > always use the appropriate version to read the request (as specified in
> > the
> > > header).
> > >
> > > NIO layer
> > >
> > > The plan is to add a non-blocking multi-connection abstraction that
> would
> > > be used by both clients.
> > >
> > > class Selector {
> > >   /* create a new connection and associate it with the given id */
> > >   public void connect(int id, InetSocketAddress address,
> > intsendBufferSize,
> > > int receiveBufferSize)
> > >   /* wakeup this selector if it is currently awaiting data */
> > >   public void wakeup()
> > >   /* user provides sends, recieves, and a timeout. this method will
> > > populate "completed" and "disconnects" lists. Method blocks for up to
> the
> > > timeout waiting for data to read. */
> > >   public void poll(long timeout, List<Send> sends, List<Send>
> completed,
> > > List<Receive> receives, List<Integer> disconnects)
> > > }
> > >
> > > The consumer and producer would then each define their own logic to
> > manage
> > > their set of in-flight requests.
> > >
> > > Producer Implementation
> > >
> > > There are a couple of interesting changes I think we can make to the
> > > producer implementation.
> > >
> > > We retain the single background "sender" thread.
> > >
> > > But we can remove the definition of sync vs async clients. We always
> > return
> > > a "future" response immediately. Both sync and async sends would go
> > through
> > > the buffering that we currently do for the async layer. This would mean
> > > that even in sync mode while the event loop is doing network IO if many
> > > requests accumulate they will be sent in a single batch. This
> effectively
> > > acts as a kind of "group commit". So instead of having an "async" mode
> > that
> > > acts differently in some way you just have a max.delay time that
> controls
> > > how long the client will linger waiting for more data to accumulate.
> > > max.delay=0 is equivalent to the current sync producer.
> > >
> > > I would also propose changing our buffering strategy. Currently we
> queue
> > > unserialized requests in a BlockingQueue. This is not ideal as it is
> very
> > > difficult to reason about the memory usage of this queue. One 5MB
> message
> > > may be bigger than 10k small messages. I propose that (1) we change our
> > > queuing strategy to queue per-partition and (2) we directly write the
> > > messages to the ByteBuffer which will eventually be sent and use that
> as
> > > the "queue". The batch size should likewise be in bytes not in number
> of
> > > messages.
> > >
> > > If you think about it our current queuing strategy doesn't really make
> > > sense any more now that we always load balance over brokers. You set a
> > > batch size of N and we do a request when we have N messages in queue
> but
> > > this says nothing about the size of the requests that will be sent. You
> > > might end up sending all N messages to one server or you might end up
> > > sending 1 message to N different servers (totally defeating the purpose
> > of
> > > batching).
> > >
> > > There are two granularities of batching that could make sense: the
> broker
> > > level or the partition level. We do the send requests at the broker
> level
> > > but we do the disk IO at the partition level. I propose making the
> queues
> > > per-partition rather than per broker to avoid having to reshuffle the
> > > contents of queues when leadership changes. This could be debated,
> > though.
> > >
> > > If you actually look at the byte path of the producer this approach
> > allows
> > > cleaning a ton of stuff up. We can do in-pace writes to the destination
> > > buffer that we will eventually send. This does mean moving
> serialization
> > > and compression to the user thread. But I think this is good as these
> may
> > > be slow but aren't unpredictably slow.
> > >
> > > The per-partition queues are thus implemented with a bunch of
> > pre-allocated
> > > ByteBuffers sized to max.batch.size, when the buffer is full or the
> delay
> > > time elapses that buffer is sent.
> > >
> > > By doing this we could actually just reuse the buffers when the send is
> > > complete. This would be nice because since the buffers are used for
> > > allocation they will likely fall out of young gen.
> > >
> > > A question to think about is how we want to bound memory usage. I think
> > > what we want is the max.batch.size which controls the size of the
> > > individual buffers and total.buffer.memory which controls the total
> > memory
> > > used by all buffers. One problem with this is that there is the
> > possibility
> > > of some fragmentation. I.e. image a situation with 5k partitions being
> > > produced to, each getting a low but steady message rate. Giving each of
> > > these a 1MB buffer would require 5GB of buffer space to have a buffer
> for
> > > each partition. I'm not sure how bad this is since at least the memory
> > > usage is predictable and the reality is that holding thousands of java
> > > objects has huge overhead compared to contiguous byte arrays.
> > >
> > > -Jay
> > >
> >
>

Re: Client improvement discussion

Posted by Chris Hogue <cs...@gmail.com>.
Thanks for the responses. Additional follow-up inline.


On Fri, Aug 2, 2013 at 2:21 PM, Jay Kreps <ja...@gmail.com> wrote:

> Great comments, answers inline!
>
> On Fri, Aug 2, 2013 at 12:28 PM, Chris Hogue <cs...@gmail.com> wrote:
>
> > These sounds like great steps. A couple of votes and questions:
> >
> > 1.  Moving serialization out and basing it all off of byte[] for key and
> > payload makes sense. Echoing a response below, we've ended up doing that
> in
> > some cases anyway, and the others do a trivial transform to bytes with an
> > Encoder.
> >
>
> Cool.
>
>
> > 2. On the single producer thread, we're actually suffering a bit from
> this
> > in 0.8, but it's mostly because compression and the blocking send happen
> on
> > this thread. In 0.7 since there was a thread-per-broker, a nice
> side-effect
> > was that compression and the blocking could "go wide", at least to the
> > number of brokers. If compression is moved out and the sends are now
> > non-blocking then this sounds like a nice improvement.
> >
>
> I think even in 0.7 there was only one thread, right?
>
>
I believe it was actually 1 per broker. Producer.scala iterates the brokers
and adds a new producer for each. The ProducerPool.addProducer() method
adds a new AsyncProducer instance for the broker (assuming async mode), and
each AsyncProducer creates and starts its own ProducerSendThread.

In either case, going to multiplexed I/O and not having the compression on
this thread probably solves any issue there.



>
> > 3. The wiki talks about static partition assignment for consumers. Just
> > adding a vote for that as we're currently working through how to do that
> > ourselves with the 0.8 consumer.
> >
>
> Cool, yeah currently you must use the simple consumer to get that which is
> a pain.
>
>
> > 4. I'm curious how compression would interact with the new ByteBuffer
> > buffering you've described. If I'm following correctly you've said that
> > rather than queueing objects you'd end up doing in-place writes to the
> > pre-allocated ByteBuffer. Presumably this means the compression has
> already
> > happened on the user thread. But if there's no batching/buffering except
> in
> > the ByteBuffer, is there somewhere that multiple messages will be
> > compressed together (since it should result in better compression)? Maybe
> > there's still batching before this and I read too much into it?
> >
>
> I'm not 100% sure, but I believe the compression can still be done inline.
> The compression algorithm will buffer a bit, of course. What we currently
> do though is write out the full data uncompressed and then compress it.
> This is pretty inefficient. Basically we are using Java's OutputStream apis
> for compression but we need to be using the lower-level array oriented
> algorithms like (Deflater). I haven't tried this but my assumption is that
> we can compress the messages as they arrive into the destination buffer
> instead of the current approach.
>


Right, was starting to think you may be looking at a way of doing the
compression incrementally as they come in. Sounds like what you're pursuing.



>
>
> > 5. I don't know if this is quite the right place to discuss it, but since
> > the producer has some involvement I'll throw it out there. The
> un-compress,
> > assign offsets, re-compress that happens on the broker with the built-in
> > compression API is a significant bottleneck that we're really trying to
> > avoid. As noted in another thread, we saw a throughput increase on the
> > order of 3x when we pre-batched and compressed the payloads before
> sending
> > it to the producer with 0.8.
> >
>
> Yes, it is a bummer. We think ultimately this does make sense though, for
> two reasons beyond offsets:
> 1. You have to validate the integrity of the data the client has sent to
> you or else one bad or buggy client can screw up all consumers.
> 2. The compression of the log should not be tied to the compression used by
> individual producers. We haven't made this change yet, but it is an easy
> one. The problem today is that if your producers send a variety of
> compression types your consumers need to handle the union of all types and
> you have no guarantee over what types producers may send in the future.
> Instead we think these should be decoupled. The topic should have a
> compression type property and that should be totally decoupled from the
> compression type the producer uses. In many cases there is no real need for
> the producer to use compression at all as the real thing you want to
> optimize is later inter-datacenter transfers no the network send to the
> local broker so the producer can just send uncompressed and have the broker
> control the compression type.
>
> The performance really has two causes though:
> 1. GZIP is super slow, especially java's implementation. But snappy, for
> example, is actually quite fast. We should be able to do snappy at network
> speeds according to the perf data I have seen, but...
> 2. ...our current compression code is kind of inefficient due to all the
> copying and traversal, due to the reasons cited above.
>
> So in other words I think we can make this a bit better but it probably
> won't go away. How do you feel about snappy?
>
>
Sorry, I should have been more clear--these tests were all with Snappy (the
same library Kafka uses, just called directly from our code before it went
to the producer). I did an early GZIP test and it was just too far out of
the ballpark to be useful.

I completely understand the architectural separation and the value you're
describing here, especially in a general solution where you may have many
heterogenous producer and consumer types. In our case it will be pretty
homogeneous and throughput is a primary concern, hence the focus on this.

I started putting in a description of the benchmarks we did but it's going
to blow up this thread, so it's probably best if that goes in its own
separate thread. The summary is that at an application level, this change
alone is the difference between being able to send 25,000 (2.5KB)
messages/sec to a single broker vs over 80,000/sec. For comparison, I did a
small test that simply wrote messages to the kafka logs via the Log class
(in a standalone app on that machine, not through a server) and saw around
~170,000 messages/sec. The throughput to the disk as reported by iostat
reflected a similar change.

Obviously without more detail you'll have to take those numbers as a rough
sketch, and I'm happy to give more detail separately, but that's a high
enough cost on the broker that we really think we need to avoid it.



> > I've not looked very closely at the wire-protocol, but if there was a way
> > for it to support in-place offset assignment even for compressed messages
> > it would be a huge win. Short of that we're fine taking the
> batch/compress
> > responsibility into user code, but it would be nice to have a way to do
> > that while retaining the built-in partition selection (i.e. semantic
> > partitioning) and other functionality of the producer. The new design may
> > already be an improvement in this area since it would move some
> > responsibility to the user thread.
> >
>
> We can't really do this because we are multi-writer so any offset we give
> the client would potentially be used by another producer and then be
> invalid or non-sequential.
>

I may have said this in a confusing way. With the tests we did it was still
the broker assigning offsets, it's just that the message as a whole wasn't
compressed, only the payloads. So the broker still had plain-bytes access
to the headers and went through the optimized code path that exists for
non-compressed messages.


Really appreciate your responses and glad to see this making progress.



>
> >
> > Not sure if that's clear, but as the interfaces take shape it may be
> easier
> > to see how that will work.
> >
> > -Chris
> >
> >
> >
> >
> >
> >
> > On Fri, Jul 26, 2013 at 1:00 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > I sent around a wiki a few weeks back proposing a set of client
> > > improvements that essentially amount to a rewrite of the producer and
> > > consumer java clients.
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > >
> > > The below discussion assumes you have read this wiki.
> > >
> > > I started to do a little prototyping for the producer and wanted to
> share
> > > some of the ideas that came up to get early feedback.
> > >
> > > First, a few simple but perhaps controversial things to discuss.
> > >
> > > Rollout
> > > Phase 1: We add the new clients. No change on the server. Old clients
> > still
> > > exist. The new clients will be entirely in a new package so there will
> be
> > > no possibility of name collision.
> > > Phase 2: We swap out all shared code on the server to use the new
> client
> > > stuff. At this point the old clients still exist but are essentially
> > > deprecated.
> > > Phase 3: We remove the old client code.
> > >
> > > Java
> > > I think we should do the clients in java. Making our users deal with
> > > scala's non-compatability issues and crazy stack traces causes people a
> > lot
> > > of pain. Furthermore we end up having to wrap everything now to get a
> > > usable java api anyway for non-scala people. This does mean
> maintaining a
> > > substantial chunk of java code, which is maybe less fun than scala. But
> > > basically i think we should optimize for the end user and produce a
> > > standalone pure-java jar with no dependencies.
> > >
> > > Jars
> > > We definitely want to separate out the client jar. There is also a fair
> > > amount of code shared between both (exceptions, protocol definition,
> > utils,
> > > and the message set implementation). Two approaches.
> > > Two jar approach: split kafka.jar into kafka-clients.jar and
> > > kafka-server.jar with the server depending on the clients. The
> advantage
> > of
> > > this is that it is simple. The disadvantage is that things like utils
> and
> > > protocol definition will be in the client jar though technical they
> > belong
> > > equally to the server.
> > > Many jar approach: split kafka.jar into kafka-common.jar,
> > > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> > > kafka-server.jar. The disadvantage of this is that the user needs two
> > jars
> > > (common + something) which is for sure going to confuse people. I also
> > > think this will tend to spawn more jars over time.
> > >
> > > Background threads
> > > I am thinking of moving both serialization and compression out of the
> > > background send thread. I will explain a little about this idea below.
> > >
> > > Serialization
> > > I am not sure if we should handle serialization in the client at all.
> > > Basically I wonder if our own API wouldn't just be a lot simpler if we
> > took
> > > a byte[] key and byte[] value and let people serialize stuff
> themselves.
> > > Injecting a class name for us to create the serializer is more
> roundabout
> > > and has a lot of problems if the serializer itself requires a lot of
> > > configuration or other objects to be instantiated.
> > >
> > > Partitioning
> > > The real question with serialization is whether the partitioning should
> > > happen on the java object or on the byte array key. The argument for
> > doing
> > > it on the java object is that it is easier to do something like a range
> > > partition on the object. The problem with doing it on the object is
> that
> > > the consumer may not be in java and so may not be able to reproduce the
> > > partitioning. For example we currently use Object.hashCode which is a
> > > little sketchy. We would be better off doing a standardized hash
> function
> > > on the key bytes. If we want to give the partitioner access to the
> > original
> > > java object then obviously we need to handle serialization behind our
> > api.
> > >
> > > Names
> > > I think good names are important. I would like to rename the following
> > > classes in the new client:
> > >   Message=>Record: Now that the message has both a message and a key it
> > is
> > > more of a KeyedMessage. Another name for a KeyedMessage is a Record.
> > >   MessageSet=>Records: This isn't too important but nit pickers
> complain
> > > that it is not technically a Set but rather a List or Sequence but
> > > MessageList sounds funny to me.
> > >
> > > The actual clients will not interact with these classes. They will
> > interact
> > > with a ProducerRecord and ConsumerRecord. The reason for having
> different
> > > fields is because the different clients
> > > Proposed producer API:
> > > SendResponse r = producer.send(new ProducerRecord(topic, key, value))
> > >
> > > Protocol Definition
> > >
> > > Here is what I am thinking about protocol definition. I see a couple of
> > > problems with what we are doing currently. First the protocol
> definition
> > is
> > > spread throughout a bunch of custom java objects. The error reporting
> in
> > > these object is really terrible because they don't record the field
> > names.
> > > Furthermore people keep adding business logic into the protocol objects
> > > which is pretty nasty.
> > >
> > > I would like to move to having a single Protocol.java file that defines
> > the
> > > protocol in a readable DSL. Here is what I am thinking:
> > >
> > >   public static Schema REQUEST_HEADER =
> > >
> > >     new Schema(new Field("api_key", INT16, "The id of the request
> > type."),
> > >
> > >                new Field("api_version", INT16, "The version of the
> > API."),
> > >
> > >                  new Field("correlation_id", INT32, "A user-supplied
> > > integer value that will be passed back with the response"),
> > >
> > >                  new Field("client_id", STRING, "A user specified
> > > identifier for the client making the request."));
> > >
> > > To parse one of these requests you would do
> > >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> > >    short apiKey = struct.get("api_key");
> > >
> > > Internally Struct is just an Object[] with one entry per field which is
> > > populated from the schema. The mapping of name to array index is a hash
> > > table lookup. We can optimize access for performance critical areas by
> > > allowing:
> > >    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> > > this once to lookup the index of the field
> > >    ...
> > >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> > >    short apiKey = struct.get(apiKeyField); // now this is just an array
> > > access
> > >
> > > One advantage of this is this level of indirection will make it really
> > easy
> > > for us to handle backwards compatability in a more principled way. The
> > > protocol file will actually contain ALL versions of the schema and we
> > will
> > > always use the appropriate version to read the request (as specified in
> > the
> > > header).
> > >
> > > NIO layer
> > >
> > > The plan is to add a non-blocking multi-connection abstraction that
> would
> > > be used by both clients.
> > >
> > > class Selector {
> > >   /* create a new connection and associate it with the given id */
> > >   public void connect(int id, InetSocketAddress address,
> > intsendBufferSize,
> > > int receiveBufferSize)
> > >   /* wakeup this selector if it is currently awaiting data */
> > >   public void wakeup()
> > >   /* user provides sends, recieves, and a timeout. this method will
> > > populate "completed" and "disconnects" lists. Method blocks for up to
> the
> > > timeout waiting for data to read. */
> > >   public void poll(long timeout, List<Send> sends, List<Send>
> completed,
> > > List<Receive> receives, List<Integer> disconnects)
> > > }
> > >
> > > The consumer and producer would then each define their own logic to
> > manage
> > > their set of in-flight requests.
> > >
> > > Producer Implementation
> > >
> > > There are a couple of interesting changes I think we can make to the
> > > producer implementation.
> > >
> > > We retain the single background "sender" thread.
> > >
> > > But we can remove the definition of sync vs async clients. We always
> > return
> > > a "future" response immediately. Both sync and async sends would go
> > through
> > > the buffering that we currently do for the async layer. This would mean
> > > that even in sync mode while the event loop is doing network IO if many
> > > requests accumulate they will be sent in a single batch. This
> effectively
> > > acts as a kind of "group commit". So instead of having an "async" mode
> > that
> > > acts differently in some way you just have a max.delay time that
> controls
> > > how long the client will linger waiting for more data to accumulate.
> > > max.delay=0 is equivalent to the current sync producer.
> > >
> > > I would also propose changing our buffering strategy. Currently we
> queue
> > > unserialized requests in a BlockingQueue. This is not ideal as it is
> very
> > > difficult to reason about the memory usage of this queue. One 5MB
> message
> > > may be bigger than 10k small messages. I propose that (1) we change our
> > > queuing strategy to queue per-partition and (2) we directly write the
> > > messages to the ByteBuffer which will eventually be sent and use that
> as
> > > the "queue". The batch size should likewise be in bytes not in number
> of
> > > messages.
> > >
> > > If you think about it our current queuing strategy doesn't really make
> > > sense any more now that we always load balance over brokers. You set a
> > > batch size of N and we do a request when we have N messages in queue
> but
> > > this says nothing about the size of the requests that will be sent. You
> > > might end up sending all N messages to one server or you might end up
> > > sending 1 message to N different servers (totally defeating the purpose
> > of
> > > batching).
> > >
> > > There are two granularities of batching that could make sense: the
> broker
> > > level or the partition level. We do the send requests at the broker
> level
> > > but we do the disk IO at the partition level. I propose making the
> queues
> > > per-partition rather than per broker to avoid having to reshuffle the
> > > contents of queues when leadership changes. This could be debated,
> > though.
> > >
> > > If you actually look at the byte path of the producer this approach
> > allows
> > > cleaning a ton of stuff up. We can do in-pace writes to the destination
> > > buffer that we will eventually send. This does mean moving
> serialization
> > > and compression to the user thread. But I think this is good as these
> may
> > > be slow but aren't unpredictably slow.
> > >
> > > The per-partition queues are thus implemented with a bunch of
> > pre-allocated
> > > ByteBuffers sized to max.batch.size, when the buffer is full or the
> delay
> > > time elapses that buffer is sent.
> > >
> > > By doing this we could actually just reuse the buffers when the send is
> > > complete. This would be nice because since the buffers are used for
> > > allocation they will likely fall out of young gen.
> > >
> > > A question to think about is how we want to bound memory usage. I think
> > > what we want is the max.batch.size which controls the size of the
> > > individual buffers and total.buffer.memory which controls the total
> > memory
> > > used by all buffers. One problem with this is that there is the
> > possibility
> > > of some fragmentation. I.e. image a situation with 5k partitions being
> > > produced to, each getting a low but steady message rate. Giving each of
> > > these a 1MB buffer would require 5GB of buffer space to have a buffer
> for
> > > each partition. I'm not sure how bad this is since at least the memory
> > > usage is predictable and the reality is that holding thousands of java
> > > objects has huge overhead compared to contiguous byte arrays.
> > >
> > > -Jay
> > >
> >
>

Re: Client improvement discussion

Posted by Jay Kreps <ja...@gmail.com>.
Great comments, answers inline!

On Fri, Aug 2, 2013 at 12:28 PM, Chris Hogue <cs...@gmail.com> wrote:

> These sounds like great steps. A couple of votes and questions:
>
> 1.  Moving serialization out and basing it all off of byte[] for key and
> payload makes sense. Echoing a response below, we've ended up doing that in
> some cases anyway, and the others do a trivial transform to bytes with an
> Encoder.
>

Cool.


> 2. On the single producer thread, we're actually suffering a bit from this
> in 0.8, but it's mostly because compression and the blocking send happen on
> this thread. In 0.7 since there was a thread-per-broker, a nice side-effect
> was that compression and the blocking could "go wide", at least to the
> number of brokers. If compression is moved out and the sends are now
> non-blocking then this sounds like a nice improvement.
>

I think even in 0.7 there was only one thread, right?


> 3. The wiki talks about static partition assignment for consumers. Just
> adding a vote for that as we're currently working through how to do that
> ourselves with the 0.8 consumer.
>

Cool, yeah currently you must use the simple consumer to get that which is
a pain.


> 4. I'm curious how compression would interact with the new ByteBuffer
> buffering you've described. If I'm following correctly you've said that
> rather than queueing objects you'd end up doing in-place writes to the
> pre-allocated ByteBuffer. Presumably this means the compression has already
> happened on the user thread. But if there's no batching/buffering except in
> the ByteBuffer, is there somewhere that multiple messages will be
> compressed together (since it should result in better compression)? Maybe
> there's still batching before this and I read too much into it?
>

I'm not 100% sure, but I believe the compression can still be done inline.
The compression algorithm will buffer a bit, of course. What we currently
do though is write out the full data uncompressed and then compress it.
This is pretty inefficient. Basically we are using Java's OutputStream apis
for compression but we need to be using the lower-level array oriented
algorithms like (Deflater). I haven't tried this but my assumption is that
we can compress the messages as they arrive into the destination buffer
instead of the current approach.


> 5. I don't know if this is quite the right place to discuss it, but since
> the producer has some involvement I'll throw it out there. The un-compress,
> assign offsets, re-compress that happens on the broker with the built-in
> compression API is a significant bottleneck that we're really trying to
> avoid. As noted in another thread, we saw a throughput increase on the
> order of 3x when we pre-batched and compressed the payloads before sending
> it to the producer with 0.8.
>

Yes, it is a bummer. We think ultimately this does make sense though, for
two reasons beyond offsets:
1. You have to validate the integrity of the data the client has sent to
you or else one bad or buggy client can screw up all consumers.
2. The compression of the log should not be tied to the compression used by
individual producers. We haven't made this change yet, but it is an easy
one. The problem today is that if your producers send a variety of
compression types your consumers need to handle the union of all types and
you have no guarantee over what types producers may send in the future.
Instead we think these should be decoupled. The topic should have a
compression type property and that should be totally decoupled from the
compression type the producer uses. In many cases there is no real need for
the producer to use compression at all as the real thing you want to
optimize is later inter-datacenter transfers no the network send to the
local broker so the producer can just send uncompressed and have the broker
control the compression type.

The performance really has two causes though:
1. GZIP is super slow, especially java's implementation. But snappy, for
example, is actually quite fast. We should be able to do snappy at network
speeds according to the perf data I have seen, but...
2. ...our current compression code is kind of inefficient due to all the
copying and traversal, due to the reasons cited above.

So in other words I think we can make this a bit better but it probably
won't go away. How do you feel about snappy?


> I've not looked very closely at the wire-protocol, but if there was a way
> for it to support in-place offset assignment even for compressed messages
> it would be a huge win. Short of that we're fine taking the batch/compress
> responsibility into user code, but it would be nice to have a way to do
> that while retaining the built-in partition selection (i.e. semantic
> partitioning) and other functionality of the producer. The new design may
> already be an improvement in this area since it would move some
> responsibility to the user thread.
>

We can't really do this because we are multi-writer so any offset we give
the client would potentially be used by another producer and then be
invalid or non-sequential.


>
> Not sure if that's clear, but as the interfaces take shape it may be easier
> to see how that will work.
>
> -Chris
>
>
>
>
>
>
> On Fri, Jul 26, 2013 at 1:00 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > I sent around a wiki a few weeks back proposing a set of client
> > improvements that essentially amount to a rewrite of the producer and
> > consumer java clients.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> >
> > The below discussion assumes you have read this wiki.
> >
> > I started to do a little prototyping for the producer and wanted to share
> > some of the ideas that came up to get early feedback.
> >
> > First, a few simple but perhaps controversial things to discuss.
> >
> > Rollout
> > Phase 1: We add the new clients. No change on the server. Old clients
> still
> > exist. The new clients will be entirely in a new package so there will be
> > no possibility of name collision.
> > Phase 2: We swap out all shared code on the server to use the new client
> > stuff. At this point the old clients still exist but are essentially
> > deprecated.
> > Phase 3: We remove the old client code.
> >
> > Java
> > I think we should do the clients in java. Making our users deal with
> > scala's non-compatability issues and crazy stack traces causes people a
> lot
> > of pain. Furthermore we end up having to wrap everything now to get a
> > usable java api anyway for non-scala people. This does mean maintaining a
> > substantial chunk of java code, which is maybe less fun than scala. But
> > basically i think we should optimize for the end user and produce a
> > standalone pure-java jar with no dependencies.
> >
> > Jars
> > We definitely want to separate out the client jar. There is also a fair
> > amount of code shared between both (exceptions, protocol definition,
> utils,
> > and the message set implementation). Two approaches.
> > Two jar approach: split kafka.jar into kafka-clients.jar and
> > kafka-server.jar with the server depending on the clients. The advantage
> of
> > this is that it is simple. The disadvantage is that things like utils and
> > protocol definition will be in the client jar though technical they
> belong
> > equally to the server.
> > Many jar approach: split kafka.jar into kafka-common.jar,
> > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> > kafka-server.jar. The disadvantage of this is that the user needs two
> jars
> > (common + something) which is for sure going to confuse people. I also
> > think this will tend to spawn more jars over time.
> >
> > Background threads
> > I am thinking of moving both serialization and compression out of the
> > background send thread. I will explain a little about this idea below.
> >
> > Serialization
> > I am not sure if we should handle serialization in the client at all.
> > Basically I wonder if our own API wouldn't just be a lot simpler if we
> took
> > a byte[] key and byte[] value and let people serialize stuff themselves.
> > Injecting a class name for us to create the serializer is more roundabout
> > and has a lot of problems if the serializer itself requires a lot of
> > configuration or other objects to be instantiated.
> >
> > Partitioning
> > The real question with serialization is whether the partitioning should
> > happen on the java object or on the byte array key. The argument for
> doing
> > it on the java object is that it is easier to do something like a range
> > partition on the object. The problem with doing it on the object is that
> > the consumer may not be in java and so may not be able to reproduce the
> > partitioning. For example we currently use Object.hashCode which is a
> > little sketchy. We would be better off doing a standardized hash function
> > on the key bytes. If we want to give the partitioner access to the
> original
> > java object then obviously we need to handle serialization behind our
> api.
> >
> > Names
> > I think good names are important. I would like to rename the following
> > classes in the new client:
> >   Message=>Record: Now that the message has both a message and a key it
> is
> > more of a KeyedMessage. Another name for a KeyedMessage is a Record.
> >   MessageSet=>Records: This isn't too important but nit pickers complain
> > that it is not technically a Set but rather a List or Sequence but
> > MessageList sounds funny to me.
> >
> > The actual clients will not interact with these classes. They will
> interact
> > with a ProducerRecord and ConsumerRecord. The reason for having different
> > fields is because the different clients
> > Proposed producer API:
> > SendResponse r = producer.send(new ProducerRecord(topic, key, value))
> >
> > Protocol Definition
> >
> > Here is what I am thinking about protocol definition. I see a couple of
> > problems with what we are doing currently. First the protocol definition
> is
> > spread throughout a bunch of custom java objects. The error reporting in
> > these object is really terrible because they don't record the field
> names.
> > Furthermore people keep adding business logic into the protocol objects
> > which is pretty nasty.
> >
> > I would like to move to having a single Protocol.java file that defines
> the
> > protocol in a readable DSL. Here is what I am thinking:
> >
> >   public static Schema REQUEST_HEADER =
> >
> >     new Schema(new Field("api_key", INT16, "The id of the request
> type."),
> >
> >                new Field("api_version", INT16, "The version of the
> API."),
> >
> >                  new Field("correlation_id", INT32, "A user-supplied
> > integer value that will be passed back with the response"),
> >
> >                  new Field("client_id", STRING, "A user specified
> > identifier for the client making the request."));
> >
> > To parse one of these requests you would do
> >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> >    short apiKey = struct.get("api_key");
> >
> > Internally Struct is just an Object[] with one entry per field which is
> > populated from the schema. The mapping of name to array index is a hash
> > table lookup. We can optimize access for performance critical areas by
> > allowing:
> >    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> > this once to lookup the index of the field
> >    ...
> >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> >    short apiKey = struct.get(apiKeyField); // now this is just an array
> > access
> >
> > One advantage of this is this level of indirection will make it really
> easy
> > for us to handle backwards compatability in a more principled way. The
> > protocol file will actually contain ALL versions of the schema and we
> will
> > always use the appropriate version to read the request (as specified in
> the
> > header).
> >
> > NIO layer
> >
> > The plan is to add a non-blocking multi-connection abstraction that would
> > be used by both clients.
> >
> > class Selector {
> >   /* create a new connection and associate it with the given id */
> >   public void connect(int id, InetSocketAddress address,
> intsendBufferSize,
> > int receiveBufferSize)
> >   /* wakeup this selector if it is currently awaiting data */
> >   public void wakeup()
> >   /* user provides sends, recieves, and a timeout. this method will
> > populate "completed" and "disconnects" lists. Method blocks for up to the
> > timeout waiting for data to read. */
> >   public void poll(long timeout, List<Send> sends, List<Send> completed,
> > List<Receive> receives, List<Integer> disconnects)
> > }
> >
> > The consumer and producer would then each define their own logic to
> manage
> > their set of in-flight requests.
> >
> > Producer Implementation
> >
> > There are a couple of interesting changes I think we can make to the
> > producer implementation.
> >
> > We retain the single background "sender" thread.
> >
> > But we can remove the definition of sync vs async clients. We always
> return
> > a "future" response immediately. Both sync and async sends would go
> through
> > the buffering that we currently do for the async layer. This would mean
> > that even in sync mode while the event loop is doing network IO if many
> > requests accumulate they will be sent in a single batch. This effectively
> > acts as a kind of "group commit". So instead of having an "async" mode
> that
> > acts differently in some way you just have a max.delay time that controls
> > how long the client will linger waiting for more data to accumulate.
> > max.delay=0 is equivalent to the current sync producer.
> >
> > I would also propose changing our buffering strategy. Currently we queue
> > unserialized requests in a BlockingQueue. This is not ideal as it is very
> > difficult to reason about the memory usage of this queue. One 5MB message
> > may be bigger than 10k small messages. I propose that (1) we change our
> > queuing strategy to queue per-partition and (2) we directly write the
> > messages to the ByteBuffer which will eventually be sent and use that as
> > the "queue". The batch size should likewise be in bytes not in number of
> > messages.
> >
> > If you think about it our current queuing strategy doesn't really make
> > sense any more now that we always load balance over brokers. You set a
> > batch size of N and we do a request when we have N messages in queue but
> > this says nothing about the size of the requests that will be sent. You
> > might end up sending all N messages to one server or you might end up
> > sending 1 message to N different servers (totally defeating the purpose
> of
> > batching).
> >
> > There are two granularities of batching that could make sense: the broker
> > level or the partition level. We do the send requests at the broker level
> > but we do the disk IO at the partition level. I propose making the queues
> > per-partition rather than per broker to avoid having to reshuffle the
> > contents of queues when leadership changes. This could be debated,
> though.
> >
> > If you actually look at the byte path of the producer this approach
> allows
> > cleaning a ton of stuff up. We can do in-pace writes to the destination
> > buffer that we will eventually send. This does mean moving serialization
> > and compression to the user thread. But I think this is good as these may
> > be slow but aren't unpredictably slow.
> >
> > The per-partition queues are thus implemented with a bunch of
> pre-allocated
> > ByteBuffers sized to max.batch.size, when the buffer is full or the delay
> > time elapses that buffer is sent.
> >
> > By doing this we could actually just reuse the buffers when the send is
> > complete. This would be nice because since the buffers are used for
> > allocation they will likely fall out of young gen.
> >
> > A question to think about is how we want to bound memory usage. I think
> > what we want is the max.batch.size which controls the size of the
> > individual buffers and total.buffer.memory which controls the total
> memory
> > used by all buffers. One problem with this is that there is the
> possibility
> > of some fragmentation. I.e. image a situation with 5k partitions being
> > produced to, each getting a low but steady message rate. Giving each of
> > these a 1MB buffer would require 5GB of buffer space to have a buffer for
> > each partition. I'm not sure how bad this is since at least the memory
> > usage is predictable and the reality is that holding thousands of java
> > objects has huge overhead compared to contiguous byte arrays.
> >
> > -Jay
> >
>

Re: Client improvement discussion

Posted by Jay Kreps <ja...@gmail.com>.
Great comments, answers inline!

On Fri, Aug 2, 2013 at 12:28 PM, Chris Hogue <cs...@gmail.com> wrote:

> These sounds like great steps. A couple of votes and questions:
>
> 1.  Moving serialization out and basing it all off of byte[] for key and
> payload makes sense. Echoing a response below, we've ended up doing that in
> some cases anyway, and the others do a trivial transform to bytes with an
> Encoder.
>

Cool.


> 2. On the single producer thread, we're actually suffering a bit from this
> in 0.8, but it's mostly because compression and the blocking send happen on
> this thread. In 0.7 since there was a thread-per-broker, a nice side-effect
> was that compression and the blocking could "go wide", at least to the
> number of brokers. If compression is moved out and the sends are now
> non-blocking then this sounds like a nice improvement.
>

I think even in 0.7 there was only one thread, right?


> 3. The wiki talks about static partition assignment for consumers. Just
> adding a vote for that as we're currently working through how to do that
> ourselves with the 0.8 consumer.
>

Cool, yeah currently you must use the simple consumer to get that which is
a pain.


> 4. I'm curious how compression would interact with the new ByteBuffer
> buffering you've described. If I'm following correctly you've said that
> rather than queueing objects you'd end up doing in-place writes to the
> pre-allocated ByteBuffer. Presumably this means the compression has already
> happened on the user thread. But if there's no batching/buffering except in
> the ByteBuffer, is there somewhere that multiple messages will be
> compressed together (since it should result in better compression)? Maybe
> there's still batching before this and I read too much into it?
>

I'm not 100% sure, but I believe the compression can still be done inline.
The compression algorithm will buffer a bit, of course. What we currently
do though is write out the full data uncompressed and then compress it.
This is pretty inefficient. Basically we are using Java's OutputStream apis
for compression but we need to be using the lower-level array oriented
algorithms like (Deflater). I haven't tried this but my assumption is that
we can compress the messages as they arrive into the destination buffer
instead of the current approach.


> 5. I don't know if this is quite the right place to discuss it, but since
> the producer has some involvement I'll throw it out there. The un-compress,
> assign offsets, re-compress that happens on the broker with the built-in
> compression API is a significant bottleneck that we're really trying to
> avoid. As noted in another thread, we saw a throughput increase on the
> order of 3x when we pre-batched and compressed the payloads before sending
> it to the producer with 0.8.
>

Yes, it is a bummer. We think ultimately this does make sense though, for
two reasons beyond offsets:
1. You have to validate the integrity of the data the client has sent to
you or else one bad or buggy client can screw up all consumers.
2. The compression of the log should not be tied to the compression used by
individual producers. We haven't made this change yet, but it is an easy
one. The problem today is that if your producers send a variety of
compression types your consumers need to handle the union of all types and
you have no guarantee over what types producers may send in the future.
Instead we think these should be decoupled. The topic should have a
compression type property and that should be totally decoupled from the
compression type the producer uses. In many cases there is no real need for
the producer to use compression at all as the real thing you want to
optimize is later inter-datacenter transfers no the network send to the
local broker so the producer can just send uncompressed and have the broker
control the compression type.

The performance really has two causes though:
1. GZIP is super slow, especially java's implementation. But snappy, for
example, is actually quite fast. We should be able to do snappy at network
speeds according to the perf data I have seen, but...
2. ...our current compression code is kind of inefficient due to all the
copying and traversal, due to the reasons cited above.

So in other words I think we can make this a bit better but it probably
won't go away. How do you feel about snappy?


> I've not looked very closely at the wire-protocol, but if there was a way
> for it to support in-place offset assignment even for compressed messages
> it would be a huge win. Short of that we're fine taking the batch/compress
> responsibility into user code, but it would be nice to have a way to do
> that while retaining the built-in partition selection (i.e. semantic
> partitioning) and other functionality of the producer. The new design may
> already be an improvement in this area since it would move some
> responsibility to the user thread.
>

We can't really do this because we are multi-writer so any offset we give
the client would potentially be used by another producer and then be
invalid or non-sequential.


>
> Not sure if that's clear, but as the interfaces take shape it may be easier
> to see how that will work.
>
> -Chris
>
>
>
>
>
>
> On Fri, Jul 26, 2013 at 1:00 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > I sent around a wiki a few weeks back proposing a set of client
> > improvements that essentially amount to a rewrite of the producer and
> > consumer java clients.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> >
> > The below discussion assumes you have read this wiki.
> >
> > I started to do a little prototyping for the producer and wanted to share
> > some of the ideas that came up to get early feedback.
> >
> > First, a few simple but perhaps controversial things to discuss.
> >
> > Rollout
> > Phase 1: We add the new clients. No change on the server. Old clients
> still
> > exist. The new clients will be entirely in a new package so there will be
> > no possibility of name collision.
> > Phase 2: We swap out all shared code on the server to use the new client
> > stuff. At this point the old clients still exist but are essentially
> > deprecated.
> > Phase 3: We remove the old client code.
> >
> > Java
> > I think we should do the clients in java. Making our users deal with
> > scala's non-compatability issues and crazy stack traces causes people a
> lot
> > of pain. Furthermore we end up having to wrap everything now to get a
> > usable java api anyway for non-scala people. This does mean maintaining a
> > substantial chunk of java code, which is maybe less fun than scala. But
> > basically i think we should optimize for the end user and produce a
> > standalone pure-java jar with no dependencies.
> >
> > Jars
> > We definitely want to separate out the client jar. There is also a fair
> > amount of code shared between both (exceptions, protocol definition,
> utils,
> > and the message set implementation). Two approaches.
> > Two jar approach: split kafka.jar into kafka-clients.jar and
> > kafka-server.jar with the server depending on the clients. The advantage
> of
> > this is that it is simple. The disadvantage is that things like utils and
> > protocol definition will be in the client jar though technical they
> belong
> > equally to the server.
> > Many jar approach: split kafka.jar into kafka-common.jar,
> > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> > kafka-server.jar. The disadvantage of this is that the user needs two
> jars
> > (common + something) which is for sure going to confuse people. I also
> > think this will tend to spawn more jars over time.
> >
> > Background threads
> > I am thinking of moving both serialization and compression out of the
> > background send thread. I will explain a little about this idea below.
> >
> > Serialization
> > I am not sure if we should handle serialization in the client at all.
> > Basically I wonder if our own API wouldn't just be a lot simpler if we
> took
> > a byte[] key and byte[] value and let people serialize stuff themselves.
> > Injecting a class name for us to create the serializer is more roundabout
> > and has a lot of problems if the serializer itself requires a lot of
> > configuration or other objects to be instantiated.
> >
> > Partitioning
> > The real question with serialization is whether the partitioning should
> > happen on the java object or on the byte array key. The argument for
> doing
> > it on the java object is that it is easier to do something like a range
> > partition on the object. The problem with doing it on the object is that
> > the consumer may not be in java and so may not be able to reproduce the
> > partitioning. For example we currently use Object.hashCode which is a
> > little sketchy. We would be better off doing a standardized hash function
> > on the key bytes. If we want to give the partitioner access to the
> original
> > java object then obviously we need to handle serialization behind our
> api.
> >
> > Names
> > I think good names are important. I would like to rename the following
> > classes in the new client:
> >   Message=>Record: Now that the message has both a message and a key it
> is
> > more of a KeyedMessage. Another name for a KeyedMessage is a Record.
> >   MessageSet=>Records: This isn't too important but nit pickers complain
> > that it is not technically a Set but rather a List or Sequence but
> > MessageList sounds funny to me.
> >
> > The actual clients will not interact with these classes. They will
> interact
> > with a ProducerRecord and ConsumerRecord. The reason for having different
> > fields is because the different clients
> > Proposed producer API:
> > SendResponse r = producer.send(new ProducerRecord(topic, key, value))
> >
> > Protocol Definition
> >
> > Here is what I am thinking about protocol definition. I see a couple of
> > problems with what we are doing currently. First the protocol definition
> is
> > spread throughout a bunch of custom java objects. The error reporting in
> > these object is really terrible because they don't record the field
> names.
> > Furthermore people keep adding business logic into the protocol objects
> > which is pretty nasty.
> >
> > I would like to move to having a single Protocol.java file that defines
> the
> > protocol in a readable DSL. Here is what I am thinking:
> >
> >   public static Schema REQUEST_HEADER =
> >
> >     new Schema(new Field("api_key", INT16, "The id of the request
> type."),
> >
> >                new Field("api_version", INT16, "The version of the
> API."),
> >
> >                  new Field("correlation_id", INT32, "A user-supplied
> > integer value that will be passed back with the response"),
> >
> >                  new Field("client_id", STRING, "A user specified
> > identifier for the client making the request."));
> >
> > To parse one of these requests you would do
> >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> >    short apiKey = struct.get("api_key");
> >
> > Internally Struct is just an Object[] with one entry per field which is
> > populated from the schema. The mapping of name to array index is a hash
> > table lookup. We can optimize access for performance critical areas by
> > allowing:
> >    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> > this once to lookup the index of the field
> >    ...
> >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> >    short apiKey = struct.get(apiKeyField); // now this is just an array
> > access
> >
> > One advantage of this is this level of indirection will make it really
> easy
> > for us to handle backwards compatability in a more principled way. The
> > protocol file will actually contain ALL versions of the schema and we
> will
> > always use the appropriate version to read the request (as specified in
> the
> > header).
> >
> > NIO layer
> >
> > The plan is to add a non-blocking multi-connection abstraction that would
> > be used by both clients.
> >
> > class Selector {
> >   /* create a new connection and associate it with the given id */
> >   public void connect(int id, InetSocketAddress address,
> intsendBufferSize,
> > int receiveBufferSize)
> >   /* wakeup this selector if it is currently awaiting data */
> >   public void wakeup()
> >   /* user provides sends, recieves, and a timeout. this method will
> > populate "completed" and "disconnects" lists. Method blocks for up to the
> > timeout waiting for data to read. */
> >   public void poll(long timeout, List<Send> sends, List<Send> completed,
> > List<Receive> receives, List<Integer> disconnects)
> > }
> >
> > The consumer and producer would then each define their own logic to
> manage
> > their set of in-flight requests.
> >
> > Producer Implementation
> >
> > There are a couple of interesting changes I think we can make to the
> > producer implementation.
> >
> > We retain the single background "sender" thread.
> >
> > But we can remove the definition of sync vs async clients. We always
> return
> > a "future" response immediately. Both sync and async sends would go
> through
> > the buffering that we currently do for the async layer. This would mean
> > that even in sync mode while the event loop is doing network IO if many
> > requests accumulate they will be sent in a single batch. This effectively
> > acts as a kind of "group commit". So instead of having an "async" mode
> that
> > acts differently in some way you just have a max.delay time that controls
> > how long the client will linger waiting for more data to accumulate.
> > max.delay=0 is equivalent to the current sync producer.
> >
> > I would also propose changing our buffering strategy. Currently we queue
> > unserialized requests in a BlockingQueue. This is not ideal as it is very
> > difficult to reason about the memory usage of this queue. One 5MB message
> > may be bigger than 10k small messages. I propose that (1) we change our
> > queuing strategy to queue per-partition and (2) we directly write the
> > messages to the ByteBuffer which will eventually be sent and use that as
> > the "queue". The batch size should likewise be in bytes not in number of
> > messages.
> >
> > If you think about it our current queuing strategy doesn't really make
> > sense any more now that we always load balance over brokers. You set a
> > batch size of N and we do a request when we have N messages in queue but
> > this says nothing about the size of the requests that will be sent. You
> > might end up sending all N messages to one server or you might end up
> > sending 1 message to N different servers (totally defeating the purpose
> of
> > batching).
> >
> > There are two granularities of batching that could make sense: the broker
> > level or the partition level. We do the send requests at the broker level
> > but we do the disk IO at the partition level. I propose making the queues
> > per-partition rather than per broker to avoid having to reshuffle the
> > contents of queues when leadership changes. This could be debated,
> though.
> >
> > If you actually look at the byte path of the producer this approach
> allows
> > cleaning a ton of stuff up. We can do in-pace writes to the destination
> > buffer that we will eventually send. This does mean moving serialization
> > and compression to the user thread. But I think this is good as these may
> > be slow but aren't unpredictably slow.
> >
> > The per-partition queues are thus implemented with a bunch of
> pre-allocated
> > ByteBuffers sized to max.batch.size, when the buffer is full or the delay
> > time elapses that buffer is sent.
> >
> > By doing this we could actually just reuse the buffers when the send is
> > complete. This would be nice because since the buffers are used for
> > allocation they will likely fall out of young gen.
> >
> > A question to think about is how we want to bound memory usage. I think
> > what we want is the max.batch.size which controls the size of the
> > individual buffers and total.buffer.memory which controls the total
> memory
> > used by all buffers. One problem with this is that there is the
> possibility
> > of some fragmentation. I.e. image a situation with 5k partitions being
> > produced to, each getting a low but steady message rate. Giving each of
> > these a 1MB buffer would require 5GB of buffer space to have a buffer for
> > each partition. I'm not sure how bad this is since at least the memory
> > usage is predictable and the reality is that holding thousands of java
> > objects has huge overhead compared to contiguous byte arrays.
> >
> > -Jay
> >
>

Re: Client improvement discussion

Posted by Chris Hogue <cs...@gmail.com>.
These sounds like great steps. A couple of votes and questions:

1.  Moving serialization out and basing it all off of byte[] for key and
payload makes sense. Echoing a response below, we've ended up doing that in
some cases anyway, and the others do a trivial transform to bytes with an
Encoder.

2. On the single producer thread, we're actually suffering a bit from this
in 0.8, but it's mostly because compression and the blocking send happen on
this thread. In 0.7 since there was a thread-per-broker, a nice side-effect
was that compression and the blocking could "go wide", at least to the
number of brokers. If compression is moved out and the sends are now
non-blocking then this sounds like a nice improvement.

3. The wiki talks about static partition assignment for consumers. Just
adding a vote for that as we're currently working through how to do that
ourselves with the 0.8 consumer.

4. I'm curious how compression would interact with the new ByteBuffer
buffering you've described. If I'm following correctly you've said that
rather than queueing objects you'd end up doing in-place writes to the
pre-allocated ByteBuffer. Presumably this means the compression has already
happened on the user thread. But if there's no batching/buffering except in
the ByteBuffer, is there somewhere that multiple messages will be
compressed together (since it should result in better compression)? Maybe
there's still batching before this and I read too much into it?

5. I don't know if this is quite the right place to discuss it, but since
the producer has some involvement I'll throw it out there. The un-compress,
assign offsets, re-compress that happens on the broker with the built-in
compression API is a significant bottleneck that we're really trying to
avoid. As noted in another thread, we saw a throughput increase on the
order of 3x when we pre-batched and compressed the payloads before sending
it to the producer with 0.8.

I've not looked very closely at the wire-protocol, but if there was a way
for it to support in-place offset assignment even for compressed messages
it would be a huge win. Short of that we're fine taking the batch/compress
responsibility into user code, but it would be nice to have a way to do
that while retaining the built-in partition selection (i.e. semantic
partitioning) and other functionality of the producer. The new design may
already be an improvement in this area since it would move some
responsibility to the user thread.

Not sure if that's clear, but as the interfaces take shape it may be easier
to see how that will work.

-Chris






On Fri, Jul 26, 2013 at 1:00 PM, Jay Kreps <ja...@gmail.com> wrote:

> I sent around a wiki a few weeks back proposing a set of client
> improvements that essentially amount to a rewrite of the producer and
> consumer java clients.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>
> The below discussion assumes you have read this wiki.
>
> I started to do a little prototyping for the producer and wanted to share
> some of the ideas that came up to get early feedback.
>
> First, a few simple but perhaps controversial things to discuss.
>
> Rollout
> Phase 1: We add the new clients. No change on the server. Old clients still
> exist. The new clients will be entirely in a new package so there will be
> no possibility of name collision.
> Phase 2: We swap out all shared code on the server to use the new client
> stuff. At this point the old clients still exist but are essentially
> deprecated.
> Phase 3: We remove the old client code.
>
> Java
> I think we should do the clients in java. Making our users deal with
> scala's non-compatability issues and crazy stack traces causes people a lot
> of pain. Furthermore we end up having to wrap everything now to get a
> usable java api anyway for non-scala people. This does mean maintaining a
> substantial chunk of java code, which is maybe less fun than scala. But
> basically i think we should optimize for the end user and produce a
> standalone pure-java jar with no dependencies.
>
> Jars
> We definitely want to separate out the client jar. There is also a fair
> amount of code shared between both (exceptions, protocol definition, utils,
> and the message set implementation). Two approaches.
> Two jar approach: split kafka.jar into kafka-clients.jar and
> kafka-server.jar with the server depending on the clients. The advantage of
> this is that it is simple. The disadvantage is that things like utils and
> protocol definition will be in the client jar though technical they belong
> equally to the server.
> Many jar approach: split kafka.jar into kafka-common.jar,
> kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> kafka-server.jar. The disadvantage of this is that the user needs two jars
> (common + something) which is for sure going to confuse people. I also
> think this will tend to spawn more jars over time.
>
> Background threads
> I am thinking of moving both serialization and compression out of the
> background send thread. I will explain a little about this idea below.
>
> Serialization
> I am not sure if we should handle serialization in the client at all.
> Basically I wonder if our own API wouldn't just be a lot simpler if we took
> a byte[] key and byte[] value and let people serialize stuff themselves.
> Injecting a class name for us to create the serializer is more roundabout
> and has a lot of problems if the serializer itself requires a lot of
> configuration or other objects to be instantiated.
>
> Partitioning
> The real question with serialization is whether the partitioning should
> happen on the java object or on the byte array key. The argument for doing
> it on the java object is that it is easier to do something like a range
> partition on the object. The problem with doing it on the object is that
> the consumer may not be in java and so may not be able to reproduce the
> partitioning. For example we currently use Object.hashCode which is a
> little sketchy. We would be better off doing a standardized hash function
> on the key bytes. If we want to give the partitioner access to the original
> java object then obviously we need to handle serialization behind our api.
>
> Names
> I think good names are important. I would like to rename the following
> classes in the new client:
>   Message=>Record: Now that the message has both a message and a key it is
> more of a KeyedMessage. Another name for a KeyedMessage is a Record.
>   MessageSet=>Records: This isn't too important but nit pickers complain
> that it is not technically a Set but rather a List or Sequence but
> MessageList sounds funny to me.
>
> The actual clients will not interact with these classes. They will interact
> with a ProducerRecord and ConsumerRecord. The reason for having different
> fields is because the different clients
> Proposed producer API:
> SendResponse r = producer.send(new ProducerRecord(topic, key, value))
>
> Protocol Definition
>
> Here is what I am thinking about protocol definition. I see a couple of
> problems with what we are doing currently. First the protocol definition is
> spread throughout a bunch of custom java objects. The error reporting in
> these object is really terrible because they don't record the field names.
> Furthermore people keep adding business logic into the protocol objects
> which is pretty nasty.
>
> I would like to move to having a single Protocol.java file that defines the
> protocol in a readable DSL. Here is what I am thinking:
>
>   public static Schema REQUEST_HEADER =
>
>     new Schema(new Field("api_key", INT16, "The id of the request type."),
>
>                new Field("api_version", INT16, "The version of the API."),
>
>                  new Field("correlation_id", INT32, "A user-supplied
> integer value that will be passed back with the response"),
>
>                  new Field("client_id", STRING, "A user specified
> identifier for the client making the request."));
>
> To parse one of these requests you would do
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get("api_key");
>
> Internally Struct is just an Object[] with one entry per field which is
> populated from the schema. The mapping of name to array index is a hash
> table lookup. We can optimize access for performance critical areas by
> allowing:
>    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> this once to lookup the index of the field
>    ...
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get(apiKeyField); // now this is just an array
> access
>
> One advantage of this is this level of indirection will make it really easy
> for us to handle backwards compatability in a more principled way. The
> protocol file will actually contain ALL versions of the schema and we will
> always use the appropriate version to read the request (as specified in the
> header).
>
> NIO layer
>
> The plan is to add a non-blocking multi-connection abstraction that would
> be used by both clients.
>
> class Selector {
>   /* create a new connection and associate it with the given id */
>   public void connect(int id, InetSocketAddress address, intsendBufferSize,
> int receiveBufferSize)
>   /* wakeup this selector if it is currently awaiting data */
>   public void wakeup()
>   /* user provides sends, recieves, and a timeout. this method will
> populate "completed" and "disconnects" lists. Method blocks for up to the
> timeout waiting for data to read. */
>   public void poll(long timeout, List<Send> sends, List<Send> completed,
> List<Receive> receives, List<Integer> disconnects)
> }
>
> The consumer and producer would then each define their own logic to manage
> their set of in-flight requests.
>
> Producer Implementation
>
> There are a couple of interesting changes I think we can make to the
> producer implementation.
>
> We retain the single background "sender" thread.
>
> But we can remove the definition of sync vs async clients. We always return
> a "future" response immediately. Both sync and async sends would go through
> the buffering that we currently do for the async layer. This would mean
> that even in sync mode while the event loop is doing network IO if many
> requests accumulate they will be sent in a single batch. This effectively
> acts as a kind of "group commit". So instead of having an "async" mode that
> acts differently in some way you just have a max.delay time that controls
> how long the client will linger waiting for more data to accumulate.
> max.delay=0 is equivalent to the current sync producer.
>
> I would also propose changing our buffering strategy. Currently we queue
> unserialized requests in a BlockingQueue. This is not ideal as it is very
> difficult to reason about the memory usage of this queue. One 5MB message
> may be bigger than 10k small messages. I propose that (1) we change our
> queuing strategy to queue per-partition and (2) we directly write the
> messages to the ByteBuffer which will eventually be sent and use that as
> the "queue". The batch size should likewise be in bytes not in number of
> messages.
>
> If you think about it our current queuing strategy doesn't really make
> sense any more now that we always load balance over brokers. You set a
> batch size of N and we do a request when we have N messages in queue but
> this says nothing about the size of the requests that will be sent. You
> might end up sending all N messages to one server or you might end up
> sending 1 message to N different servers (totally defeating the purpose of
> batching).
>
> There are two granularities of batching that could make sense: the broker
> level or the partition level. We do the send requests at the broker level
> but we do the disk IO at the partition level. I propose making the queues
> per-partition rather than per broker to avoid having to reshuffle the
> contents of queues when leadership changes. This could be debated, though.
>
> If you actually look at the byte path of the producer this approach allows
> cleaning a ton of stuff up. We can do in-pace writes to the destination
> buffer that we will eventually send. This does mean moving serialization
> and compression to the user thread. But I think this is good as these may
> be slow but aren't unpredictably slow.
>
> The per-partition queues are thus implemented with a bunch of pre-allocated
> ByteBuffers sized to max.batch.size, when the buffer is full or the delay
> time elapses that buffer is sent.
>
> By doing this we could actually just reuse the buffers when the send is
> complete. This would be nice because since the buffers are used for
> allocation they will likely fall out of young gen.
>
> A question to think about is how we want to bound memory usage. I think
> what we want is the max.batch.size which controls the size of the
> individual buffers and total.buffer.memory which controls the total memory
> used by all buffers. One problem with this is that there is the possibility
> of some fragmentation. I.e. image a situation with 5k partitions being
> produced to, each getting a low but steady message rate. Giving each of
> these a 1MB buffer would require 5GB of buffer space to have a buffer for
> each partition. I'm not sure how bad this is since at least the memory
> usage is predictable and the reality is that holding thousands of java
> objects has huge overhead compared to contiguous byte arrays.
>
> -Jay
>

Re: Client improvement discussion

Posted by Jay Kreps <ja...@gmail.com>.
Hey Chris,

Great questions.

1. Sync vs async. Actually I am saying the client will (1) always be async
but (2) always return a response. So
   val resp = client.send(k, v)  // returns immediately
   resp.await() // waits for request to complete
   resp.offset() // waits for request to complete (if needed) and returns
offset
Basically the response object acts as a Future for the offset/error. So if
you like the blocking behavior you can just do
   client.send(k,v).await()
which would behave as it does now.

2. Yeah this is in the protocol. I should add it to the wiki.

3. onCompletion registers a callback that will either fire immediately (if
the request is already completed) or will fire when the request does
complete. The purpose is to allow handling multiple responses without
blocking (as await does), but at the cost of a slightly more complicated
programming model.

4. Consuming from just some topics. I see what you are saying. I don't
think I can actually have this work the way you want though because the
fetch responses are by broker not by topic. So you could ignore a broker
but it would just ignore all topics on that broker. What we probably do
need is a way to deregister a topic which would let you stop fetching for
that topic.

5. A lot of your questions are on the group membership bit. Let me just
clean it up, I think the writing is sloppy. The proposal was to implement a
generic group membership facility where each group member heartbeats to
remain a member. I was proposing to omit any partition assignment logic
from the server as that can be done client side once all group members
agree on who is in the group. So a simple approach would be that clients
know their topics and each take ~P/N partitions where P is the total
partition count and N the number of group members. P is available from the
topic metadata request and P is filled in by the group membership api. As
you say the client will likely want to commit its offsets before leaving a
group to ensure a clean hand-off.

6. Jars. Actually our non-compatability in 0.8 was not due to jars but due
to not implementing compatible protocol changes (partially due to major
refactoring of protocol). This will be even easier with the serialization
layer which would effectively have all protocol versions ever and always
use the writers version for decoding.

7. Serialization and compression. Again I think this is just confusion due
to sloppy writing. The compression will definitely be in the client. The
question is just whether it happens in the user thread when they are
enqueuing the request or in the network thread. Since machines now have
like 48 cores anything cpu-intensive in the network thread is a little
suspicious. Serialization though I think might be better left out entirely.

-Jay


On Sat, Jul 27, 2013 at 8:12 PM, Chris Riccomini <cr...@linkedin.com>wrote:

> Hey Jay,
>
> Reading over the wiki (and email thread). Here are some questions/comments:
>
> "Make the producer fully async to to allow issuing sends to all brokers
> simultaneously and having multiple in-flight requests simultaneously. This
> will dramatically reduce the impact of latency on throughput (which is
> important with replication)."
>
> Can you say a bit more about this? You're only talking about the async
> case, right? If I set a producer to sync, acks=-1, producer.send() will
> still block as expected, right?
>
> "Move to server-side offset management will allow us to scale this
> facility which is currently a big scalability problem for high-commit rate
> consumers due to zk non scalability."
>
> Just confirming that the proposal still allows us to store a K/V map (or
> metadata), in addition to just offsets, right? This was in the older
> proposal that I saw, but I just wanted to make sure. The consumer API
> don't seem to reflect this.
>
> """
> SendResponse r = producer.send(new KafkaMessage(topic, key, message));
> r.onCompletion(new Runnable() {System.out.println("All done")})
> r.getOffset()
> r.getError()
> """
>
> To block (wait on the future to return) in the new API, I have to either
> call getOffset or getError? Or is onCompletion blocking?
>
> "List<MessageAndOffset> messages = consumer.poll(timeout);"
>
> Does it make sense to allow to poll only specific topics? If we move to an
> epoll/selector model, would it be OK to just poll for certain topic
> handles, and let the other handles sit there with their socket buffers
> filling up? The use case that I'm thinking of is where you've registered
> two topics, but only want to consume from one for a while. In the API
> you've proposed, the consumer keeps feeding messages from the other topic,
> which you then have to buffer (and potentially run out of memory). A
> work-around is to have one consumer per-topic, in this case, but I'd
> rather just let the OS-level socket buffer do the buffering, if that's
> possible.
>
> "The timeout the user specifies will be purely to ensure we have a
> mechanism to give control back to the user even when no messages are
> delivered. It is up to the user to ensure poll() is called again within
> the heartbeat frequency set for the consumer group. Internally the timeout
> on our select() may uses a shorter timeout to ensure the heartbeat
> frequency is met even when no messages are delivered."
>
> I don't think I understand this. What's the heartbeat you're talking about
> here? Is this the consumer membership group heartbeat?
>
> "We will introduce a set of RPC apis for managing partition assignment on
> the consumer side ... This set of APIs is orthogonal to the APIs for
> producing and consuming data, it is responsible for group membership."
>
> I'm a little confused. I think what you're saying is the API for managing
> partition assignment is totally generic, but the consumer is going to use
> it to manage its partition groups, right? I could use it for other
> partition assignment, if I wanted to, though, correct? Is this going to
> require instantiating a consumer to use, or will there be some other
> partition group API/connection/thingy that I can use?
>
> "Create a new group with the given name and the specified minimum
> heartbeat frequency. Return the id/host/port of the server acting as the
> controller for that group.
> If the ephemeral flag is set the group will disappear when the last client
> exits."
>
> What happens if create_group X is called more than once? Can this be used
> for leadership election (first call creates the group, and is notified
> that it' the leader)? There are a lot of gaps to fill in there, but it
> could be a pretty useful feature.
>
> "on receiving acknowledgements from all consumers of the group membership
> change the controller sends the a group_changed message to all the new
> group members"
>
> Suppose that an existing consumer owns partition 7 for a given topic. A
> new consumer joins the group, and the partition assignments work out such
> that the new consumer should own partition 7. As I understand it, tho
> means that the old consumer should stop consuming from partition 7 when
> begin_group_change is sent, right? This means a) no one consumes partition
> 7 until consensus is gathered, and b) if the any consumer dies before its
> ack is sent, you have to wait up to one full heartbeat before partition 7
> is consumed again, correct? What kind of heartbeat do we expect to be
> "normal"? 10s? 60s?
>
> Also, how are offsets handled during this transition? Should the old
> consumer checkpoint its offset for the topic/partition that it's
> relinquishing? Are duplicate messages going to be consumed during this
> transition (i.e. A messages is consumed by both the old and new consumer)?
>
> Also^2, what exactly is specified in the begin_group_change notification?
> If you don't handle any partition assignment inside Kafka, don't the
> consumers all need to know who is in the group? The wiki list no fields
> for this. It seems like you'd need something like [consumer1, consumer2,
> conumer3], or something, to do deterministic ring/hash-based partition
> assignment entirely client side. If this is the case, will Kafka assign
> consumer group IDs to each registered consumer? This would make it easier
> to implement client-side partition assignment.
>
> "Currently we do a lot of logging and metrics recording in the client. I
> think instead of handling reporting of metrics and logging we should
> instead incorporate this feedback in the client apis and allow the user to
> log or monitor this in the manner they chose. This will avoid dependence
> on a particular logging or metrics library and seems like a good policy."
>
> +1
>
> """
> How many jars should we have? I think we could do either
>
> * Option A: kafka-clients.jar and kafka-server.jar or
> * Option B: kakfa-common.jar, kafka-producer.jar, kafka-consumer.jar, and
> kafka-server.jar
>
> I prefer Option A as it is simpler if we add an AdminApi--we won't need to
> introduce a whole new jar for it.
> """
>
> An argument for option B is that it would make it easier to consume from
> one version of Kafka and produce to another, provided that kafka-common is
> API and runtime (protocol) compatible (MirrorMaker). It could just be a
> pipe dream, since almost every version bump is likely to introduce some
> level of incompatibility in the API. A third (crazy) option is to have
> only two jars: kafka-producer, and kafka-conumer, but to have a third
> FOLDER called kafka-common in the source tree. Using SBT (heh), prior to
> the compile task, copy the kafka-common folder into non-colliding package
> spaces in both the kafka-consumer and kafka-producer jars. This would
> essentially result in two entirely stand-alone jars, allow for different
> consumer/producer versions, and allow for only a single "common" source
> tree. I know, it's crazy, but I thought I'd throw it out there anyway.
> Also not sure how well this idea would work with IDEs.
>
> "I am thinking of moving both serialization and compression out of the
> background send thread."
>
> I take this to mean that the consumer/producer API is just essentially
> send(byte[])/poll(byte[]). Making users do (de)serialization in their own
> code seems pretty reasonable, but I'm kind of bummed if I'm going to have
> to manage my own compression too. It's handy to just flip a switch and get
> compression out of the box. An example of this exact API would be LevelDB,
> which takes bytes-in/bytes-out, but can handle compression for you, if you
> wish (defaults to auto-compressing with Snappy, I think).
>
> Cheers,
> Chris
>
>
>
> On 7/26/13 3:41 PM, "Xavier Stevens" <xa...@gaikai.com> wrote:
>
> >+1 to making the API use bytes and push serialization into the client.
> >This
> >is effectively what I am doing currently anyway. I implemented a generic
> >Encoder<ByteString> which just passes the bytes through.
> >
> >I also like the idea of the client being written in pure Java. Interacting
> >with Scala code from Java isn't nearly as nice as the other way around.
> >
> >Just my 2 cents.
> >
> >-Xavier
> >
> >
> >
> >On Fri, Jul 26, 2013 at 2:46 PM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >
> >> Jay,
> >>
> >> This seems like a great direction.  Simplifying the consumer client
> >>would
> >> be a big win, and +1 for more native java client integration.
> >>
> >> On the last point, regarding memory usage for buffering per partition.
> >>I
> >> would think it could be possible to devise a dynamic queuing system, to
> >> allow higher volume partitions to have larger effective buffers than
> >> smaller, low-volume partitions.  Thus, if you reserve a fixed
> >> total.buffer.memory, you could allocate units of buffer space which
> >>could
> >> then be composed to make larger buffers (perhaps not necessarily
> >> contiguous).  The long-tail of low-volume partitions could also be
> >>moved to
> >> some sort of auxiliary, non-collated buffer space, as they are less
> >>likely
> >> to benefit from contiguous buffering anyway.
> >>
> >> Fun stuff.
> >>
> >> Jason
> >>
> >> Jason
> >>
> >>
> >> On Fri, Jul 26, 2013 at 3:00 PM, Jay Kreps <ja...@gmail.com> wrote:
> >>
> >> > I sent around a wiki a few weeks back proposing a set of client
> >> > improvements that essentially amount to a rewrite of the producer and
> >> > consumer java clients.
> >> >
> >> > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> >> >
> >> > The below discussion assumes you have read this wiki.
> >> >
> >> > I started to do a little prototyping for the producer and wanted to
> >>share
> >> > some of the ideas that came up to get early feedback.
> >> >
> >> > First, a few simple but perhaps controversial things to discuss.
> >> >
> >> > Rollout
> >> > Phase 1: We add the new clients. No change on the server. Old clients
> >> still
> >> > exist. The new clients will be entirely in a new package so there
> >>will be
> >> > no possibility of name collision.
> >> > Phase 2: We swap out all shared code on the server to use the new
> >>client
> >> > stuff. At this point the old clients still exist but are essentially
> >> > deprecated.
> >> > Phase 3: We remove the old client code.
> >> >
> >> > Java
> >> > I think we should do the clients in java. Making our users deal with
> >> > scala's non-compatability issues and crazy stack traces causes people
> >>a
> >> lot
> >> > of pain. Furthermore we end up having to wrap everything now to get a
> >> > usable java api anyway for non-scala people. This does mean
> >>maintaining a
> >> > substantial chunk of java code, which is maybe less fun than scala.
> >>But
> >> > basically i think we should optimize for the end user and produce a
> >> > standalone pure-java jar with no dependencies.
> >> >
> >> > Jars
> >> > We definitely want to separate out the client jar. There is also a
> >>fair
> >> > amount of code shared between both (exceptions, protocol definition,
> >> utils,
> >> > and the message set implementation). Two approaches.
> >> > Two jar approach: split kafka.jar into kafka-clients.jar and
> >> > kafka-server.jar with the server depending on the clients. The
> >>advantage
> >> of
> >> > this is that it is simple. The disadvantage is that things like utils
> >>and
> >> > protocol definition will be in the client jar though technical they
> >> belong
> >> > equally to the server.
> >> > Many jar approach: split kafka.jar into kafka-common.jar,
> >> > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> >> > kafka-server.jar. The disadvantage of this is that the user needs two
> >> jars
> >> > (common + something) which is for sure going to confuse people. I also
> >> > think this will tend to spawn more jars over time.
> >> >
> >> > Background threads
> >> > I am thinking of moving both serialization and compression out of the
> >> > background send thread. I will explain a little about this idea below.
> >> >
> >> > Serialization
> >> > I am not sure if we should handle serialization in the client at all.
> >> > Basically I wonder if our own API wouldn't just be a lot simpler if we
> >> took
> >> > a byte[] key and byte[] value and let people serialize stuff
> >>themselves.
> >> > Injecting a class name for us to create the serializer is more
> >>roundabout
> >> > and has a lot of problems if the serializer itself requires a lot of
> >> > configuration or other objects to be instantiated.
> >> >
> >> > Partitioning
> >> > The real question with serialization is whether the partitioning
> >>should
> >> > happen on the java object or on the byte array key. The argument for
> >> doing
> >> > it on the java object is that it is easier to do something like a
> >>range
> >> > partition on the object. The problem with doing it on the object is
> >>that
> >> > the consumer may not be in java and so may not be able to reproduce
> >>the
> >> > partitioning. For example we currently use Object.hashCode which is a
> >> > little sketchy. We would be better off doing a standardized hash
> >>function
> >> > on the key bytes. If we want to give the partitioner access to the
> >> original
> >> > java object then obviously we need to handle serialization behind our
> >> api.
> >> >
> >> > Names
> >> > I think good names are important. I would like to rename the following
> >> > classes in the new client:
> >> >   Message=>Record: Now that the message has both a message and a key
> >>it
> >> is
> >> > more of a KeyedMessage. Another name for a KeyedMessage is a Record.
> >> >   MessageSet=>Records: This isn't too important but nit pickers
> >>complain
> >> > that it is not technically a Set but rather a List or Sequence but
> >> > MessageList sounds funny to me.
> >> >
> >> > The actual clients will not interact with these classes. They will
> >> interact
> >> > with a ProducerRecord and ConsumerRecord. The reason for having
> >>different
> >> > fields is because the different clients
> >> > Proposed producer API:
> >> > SendResponse r = producer.send(new ProducerRecord(topic, key, value))
> >> >
> >> > Protocol Definition
> >> >
> >> > Here is what I am thinking about protocol definition. I see a couple
> >>of
> >> > problems with what we are doing currently. First the protocol
> >>definition
> >> is
> >> > spread throughout a bunch of custom java objects. The error reporting
> >>in
> >> > these object is really terrible because they don't record the field
> >> names.
> >> > Furthermore people keep adding business logic into the protocol
> >>objects
> >> > which is pretty nasty.
> >> >
> >> > I would like to move to having a single Protocol.java file that
> >>defines
> >> the
> >> > protocol in a readable DSL. Here is what I am thinking:
> >> >
> >> >   public static Schema REQUEST_HEADER =
> >> >
> >> >     new Schema(new Field("api_key", INT16, "The id of the request
> >> type."),
> >> >
> >> >                new Field("api_version", INT16, "The version of the
> >> API."),
> >> >
> >> >                  new Field("correlation_id", INT32, "A user-supplied
> >> > integer value that will be passed back with the response"),
> >> >
> >> >                  new Field("client_id", STRING, "A user specified
> >> > identifier for the client making the request."));
> >> >
> >> > To parse one of these requests you would do
> >> >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> >> >    short apiKey = struct.get("api_key");
> >> >
> >> > Internally Struct is just an Object[] with one entry per field which
> >>is
> >> > populated from the schema. The mapping of name to array index is a
> >>hash
> >> > table lookup. We can optimize access for performance critical areas by
> >> > allowing:
> >> >    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); //
> >>do
> >> > this once to lookup the index of the field
> >> >    ...
> >> >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> >> >    short apiKey = struct.get(apiKeyField); // now this is just an
> >>array
> >> > access
> >> >
> >> > One advantage of this is this level of indirection will make it really
> >> easy
> >> > for us to handle backwards compatability in a more principled way. The
> >> > protocol file will actually contain ALL versions of the schema and we
> >> will
> >> > always use the appropriate version to read the request (as specified
> >>in
> >> the
> >> > header).
> >> >
> >> > NIO layer
> >> >
> >> > The plan is to add a non-blocking multi-connection abstraction that
> >>would
> >> > be used by both clients.
> >> >
> >> > class Selector {
> >> >   /* create a new connection and associate it with the given id */
> >> >   public void connect(int id, InetSocketAddress address,
> >> intsendBufferSize,
> >> > int receiveBufferSize)
> >> >   /* wakeup this selector if it is currently awaiting data */
> >> >   public void wakeup()
> >> >   /* user provides sends, recieves, and a timeout. this method will
> >> > populate "completed" and "disconnects" lists. Method blocks for up to
> >>the
> >> > timeout waiting for data to read. */
> >> >   public void poll(long timeout, List<Send> sends, List<Send>
> >>completed,
> >> > List<Receive> receives, List<Integer> disconnects)
> >> > }
> >> >
> >> > The consumer and producer would then each define their own logic to
> >> manage
> >> > their set of in-flight requests.
> >> >
> >> > Producer Implementation
> >> >
> >> > There are a couple of interesting changes I think we can make to the
> >> > producer implementation.
> >> >
> >> > We retain the single background "sender" thread.
> >> >
> >> > But we can remove the definition of sync vs async clients. We always
> >> return
> >> > a "future" response immediately. Both sync and async sends would go
> >> through
> >> > the buffering that we currently do for the async layer. This would
> >>mean
> >> > that even in sync mode while the event loop is doing network IO if
> >>many
> >> > requests accumulate they will be sent in a single batch. This
> >>effectively
> >> > acts as a kind of "group commit". So instead of having an "async" mode
> >> that
> >> > acts differently in some way you just have a max.delay time that
> >>controls
> >> > how long the client will linger waiting for more data to accumulate.
> >> > max.delay=0 is equivalent to the current sync producer.
> >> >
> >> > I would also propose changing our buffering strategy. Currently we
> >>queue
> >> > unserialized requests in a BlockingQueue. This is not ideal as it is
> >>very
> >> > difficult to reason about the memory usage of this queue. One 5MB
> >>message
> >> > may be bigger than 10k small messages. I propose that (1) we change
> >>our
> >> > queuing strategy to queue per-partition and (2) we directly write the
> >> > messages to the ByteBuffer which will eventually be sent and use that
> >>as
> >> > the "queue". The batch size should likewise be in bytes not in number
> >>of
> >> > messages.
> >> >
> >> > If you think about it our current queuing strategy doesn't really make
> >> > sense any more now that we always load balance over brokers. You set a
> >> > batch size of N and we do a request when we have N messages in queue
> >>but
> >> > this says nothing about the size of the requests that will be sent.
> >>You
> >> > might end up sending all N messages to one server or you might end up
> >> > sending 1 message to N different servers (totally defeating the
> >>purpose
> >> of
> >> > batching).
> >> >
> >> > There are two granularities of batching that could make sense: the
> >>broker
> >> > level or the partition level. We do the send requests at the broker
> >>level
> >> > but we do the disk IO at the partition level. I propose making the
> >>queues
> >> > per-partition rather than per broker to avoid having to reshuffle the
> >> > contents of queues when leadership changes. This could be debated,
> >> though.
> >> >
> >> > If you actually look at the byte path of the producer this approach
> >> allows
> >> > cleaning a ton of stuff up. We can do in-pace writes to the
> >>destination
> >> > buffer that we will eventually send. This does mean moving
> >>serialization
> >> > and compression to the user thread. But I think this is good as these
> >>may
> >> > be slow but aren't unpredictably slow.
> >> >
> >> > The per-partition queues are thus implemented with a bunch of
> >> pre-allocated
> >> > ByteBuffers sized to max.batch.size, when the buffer is full or the
> >>delay
> >> > time elapses that buffer is sent.
> >> >
> >> > By doing this we could actually just reuse the buffers when the send
> >>is
> >> > complete. This would be nice because since the buffers are used for
> >> > allocation they will likely fall out of young gen.
> >> >
> >> > A question to think about is how we want to bound memory usage. I
> >>think
> >> > what we want is the max.batch.size which controls the size of the
> >> > individual buffers and total.buffer.memory which controls the total
> >> memory
> >> > used by all buffers. One problem with this is that there is the
> >> possibility
> >> > of some fragmentation. I.e. image a situation with 5k partitions being
> >> > produced to, each getting a low but steady message rate. Giving each
> >>of
> >> > these a 1MB buffer would require 5GB of buffer space to have a buffer
> >>for
> >> > each partition. I'm not sure how bad this is since at least the memory
> >> > usage is predictable and the reality is that holding thousands of java
> >> > objects has huge overhead compared to contiguous byte arrays.
> >> >
> >> > -Jay
> >> >
> >>
>
>

Re: Client improvement discussion

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Jay,

Reading over the wiki (and email thread). Here are some questions/comments:

"Make the producer fully async to to allow issuing sends to all brokers
simultaneously and having multiple in-flight requests simultaneously. This
will dramatically reduce the impact of latency on throughput (which is
important with replication)."

Can you say a bit more about this? You're only talking about the async
case, right? If I set a producer to sync, acks=-1, producer.send() will
still block as expected, right?

"Move to server-side offset management will allow us to scale this
facility which is currently a big scalability problem for high-commit rate
consumers due to zk non scalability."

Just confirming that the proposal still allows us to store a K/V map (or
metadata), in addition to just offsets, right? This was in the older
proposal that I saw, but I just wanted to make sure. The consumer API
don't seem to reflect this.

"""
SendResponse r = producer.send(new KafkaMessage(topic, key, message));
r.onCompletion(new Runnable() {System.out.println("All done")})
r.getOffset()
r.getError()
"""

To block (wait on the future to return) in the new API, I have to either
call getOffset or getError? Or is onCompletion blocking?

"List<MessageAndOffset> messages = consumer.poll(timeout);"

Does it make sense to allow to poll only specific topics? If we move to an
epoll/selector model, would it be OK to just poll for certain topic
handles, and let the other handles sit there with their socket buffers
filling up? The use case that I'm thinking of is where you've registered
two topics, but only want to consume from one for a while. In the API
you've proposed, the consumer keeps feeding messages from the other topic,
which you then have to buffer (and potentially run out of memory). A
work-around is to have one consumer per-topic, in this case, but I'd
rather just let the OS-level socket buffer do the buffering, if that's
possible.

"The timeout the user specifies will be purely to ensure we have a
mechanism to give control back to the user even when no messages are
delivered. It is up to the user to ensure poll() is called again within
the heartbeat frequency set for the consumer group. Internally the timeout
on our select() may uses a shorter timeout to ensure the heartbeat
frequency is met even when no messages are delivered."

I don't think I understand this. What's the heartbeat you're talking about
here? Is this the consumer membership group heartbeat?

"We will introduce a set of RPC apis for managing partition assignment on
the consumer side ... This set of APIs is orthogonal to the APIs for
producing and consuming data, it is responsible for group membership."

I'm a little confused. I think what you're saying is the API for managing
partition assignment is totally generic, but the consumer is going to use
it to manage its partition groups, right? I could use it for other
partition assignment, if I wanted to, though, correct? Is this going to
require instantiating a consumer to use, or will there be some other
partition group API/connection/thingy that I can use?

"Create a new group with the given name and the specified minimum
heartbeat frequency. Return the id/host/port of the server acting as the
controller for that group.
If the ephemeral flag is set the group will disappear when the last client
exits."

What happens if create_group X is called more than once? Can this be used
for leadership election (first call creates the group, and is notified
that it' the leader)? There are a lot of gaps to fill in there, but it
could be a pretty useful feature.

"on receiving acknowledgements from all consumers of the group membership
change the controller sends the a group_changed message to all the new
group members"

Suppose that an existing consumer owns partition 7 for a given topic. A
new consumer joins the group, and the partition assignments work out such
that the new consumer should own partition 7. As I understand it, tho
means that the old consumer should stop consuming from partition 7 when
begin_group_change is sent, right? This means a) no one consumes partition
7 until consensus is gathered, and b) if the any consumer dies before its
ack is sent, you have to wait up to one full heartbeat before partition 7
is consumed again, correct? What kind of heartbeat do we expect to be
"normal"? 10s? 60s?

Also, how are offsets handled during this transition? Should the old
consumer checkpoint its offset for the topic/partition that it's
relinquishing? Are duplicate messages going to be consumed during this
transition (i.e. A messages is consumed by both the old and new consumer)?

Also^2, what exactly is specified in the begin_group_change notification?
If you don't handle any partition assignment inside Kafka, don't the
consumers all need to know who is in the group? The wiki list no fields
for this. It seems like you'd need something like [consumer1, consumer2,
conumer3], or something, to do deterministic ring/hash-based partition
assignment entirely client side. If this is the case, will Kafka assign
consumer group IDs to each registered consumer? This would make it easier
to implement client-side partition assignment.

"Currently we do a lot of logging and metrics recording in the client. I
think instead of handling reporting of metrics and logging we should
instead incorporate this feedback in the client apis and allow the user to
log or monitor this in the manner they chose. This will avoid dependence
on a particular logging or metrics library and seems like a good policy."

+1

"""
How many jars should we have? I think we could do either

* Option A: kafka-clients.jar and kafka-server.jar or
* Option B: kakfa-common.jar, kafka-producer.jar, kafka-consumer.jar, and
kafka-server.jar

I prefer Option A as it is simpler if we add an AdminApi--we won't need to
introduce a whole new jar for it.
"""

An argument for option B is that it would make it easier to consume from
one version of Kafka and produce to another, provided that kafka-common is
API and runtime (protocol) compatible (MirrorMaker). It could just be a
pipe dream, since almost every version bump is likely to introduce some
level of incompatibility in the API. A third (crazy) option is to have
only two jars: kafka-producer, and kafka-conumer, but to have a third
FOLDER called kafka-common in the source tree. Using SBT (heh), prior to
the compile task, copy the kafka-common folder into non-colliding package
spaces in both the kafka-consumer and kafka-producer jars. This would
essentially result in two entirely stand-alone jars, allow for different
consumer/producer versions, and allow for only a single "common" source
tree. I know, it's crazy, but I thought I'd throw it out there anyway.
Also not sure how well this idea would work with IDEs.

"I am thinking of moving both serialization and compression out of the
background send thread."

I take this to mean that the consumer/producer API is just essentially
send(byte[])/poll(byte[]). Making users do (de)serialization in their own
code seems pretty reasonable, but I'm kind of bummed if I'm going to have
to manage my own compression too. It's handy to just flip a switch and get
compression out of the box. An example of this exact API would be LevelDB,
which takes bytes-in/bytes-out, but can handle compression for you, if you
wish (defaults to auto-compressing with Snappy, I think).

Cheers,
Chris



On 7/26/13 3:41 PM, "Xavier Stevens" <xa...@gaikai.com> wrote:

>+1 to making the API use bytes and push serialization into the client.
>This
>is effectively what I am doing currently anyway. I implemented a generic
>Encoder<ByteString> which just passes the bytes through.
>
>I also like the idea of the client being written in pure Java. Interacting
>with Scala code from Java isn't nearly as nice as the other way around.
>
>Just my 2 cents.
>
>-Xavier
>
>
>
>On Fri, Jul 26, 2013 at 2:46 PM, Jason Rosenberg <jb...@squareup.com> wrote:
>
>> Jay,
>>
>> This seems like a great direction.  Simplifying the consumer client
>>would
>> be a big win, and +1 for more native java client integration.
>>
>> On the last point, regarding memory usage for buffering per partition.
>>I
>> would think it could be possible to devise a dynamic queuing system, to
>> allow higher volume partitions to have larger effective buffers than
>> smaller, low-volume partitions.  Thus, if you reserve a fixed
>> total.buffer.memory, you could allocate units of buffer space which
>>could
>> then be composed to make larger buffers (perhaps not necessarily
>> contiguous).  The long-tail of low-volume partitions could also be
>>moved to
>> some sort of auxiliary, non-collated buffer space, as they are less
>>likely
>> to benefit from contiguous buffering anyway.
>>
>> Fun stuff.
>>
>> Jason
>>
>> Jason
>>
>>
>> On Fri, Jul 26, 2013 at 3:00 PM, Jay Kreps <ja...@gmail.com> wrote:
>>
>> > I sent around a wiki a few weeks back proposing a set of client
>> > improvements that essentially amount to a rewrite of the producer and
>> > consumer java clients.
>> >
>> > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>> >
>> > The below discussion assumes you have read this wiki.
>> >
>> > I started to do a little prototyping for the producer and wanted to
>>share
>> > some of the ideas that came up to get early feedback.
>> >
>> > First, a few simple but perhaps controversial things to discuss.
>> >
>> > Rollout
>> > Phase 1: We add the new clients. No change on the server. Old clients
>> still
>> > exist. The new clients will be entirely in a new package so there
>>will be
>> > no possibility of name collision.
>> > Phase 2: We swap out all shared code on the server to use the new
>>client
>> > stuff. At this point the old clients still exist but are essentially
>> > deprecated.
>> > Phase 3: We remove the old client code.
>> >
>> > Java
>> > I think we should do the clients in java. Making our users deal with
>> > scala's non-compatability issues and crazy stack traces causes people
>>a
>> lot
>> > of pain. Furthermore we end up having to wrap everything now to get a
>> > usable java api anyway for non-scala people. This does mean
>>maintaining a
>> > substantial chunk of java code, which is maybe less fun than scala.
>>But
>> > basically i think we should optimize for the end user and produce a
>> > standalone pure-java jar with no dependencies.
>> >
>> > Jars
>> > We definitely want to separate out the client jar. There is also a
>>fair
>> > amount of code shared between both (exceptions, protocol definition,
>> utils,
>> > and the message set implementation). Two approaches.
>> > Two jar approach: split kafka.jar into kafka-clients.jar and
>> > kafka-server.jar with the server depending on the clients. The
>>advantage
>> of
>> > this is that it is simple. The disadvantage is that things like utils
>>and
>> > protocol definition will be in the client jar though technical they
>> belong
>> > equally to the server.
>> > Many jar approach: split kafka.jar into kafka-common.jar,
>> > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
>> > kafka-server.jar. The disadvantage of this is that the user needs two
>> jars
>> > (common + something) which is for sure going to confuse people. I also
>> > think this will tend to spawn more jars over time.
>> >
>> > Background threads
>> > I am thinking of moving both serialization and compression out of the
>> > background send thread. I will explain a little about this idea below.
>> >
>> > Serialization
>> > I am not sure if we should handle serialization in the client at all.
>> > Basically I wonder if our own API wouldn't just be a lot simpler if we
>> took
>> > a byte[] key and byte[] value and let people serialize stuff
>>themselves.
>> > Injecting a class name for us to create the serializer is more
>>roundabout
>> > and has a lot of problems if the serializer itself requires a lot of
>> > configuration or other objects to be instantiated.
>> >
>> > Partitioning
>> > The real question with serialization is whether the partitioning
>>should
>> > happen on the java object or on the byte array key. The argument for
>> doing
>> > it on the java object is that it is easier to do something like a
>>range
>> > partition on the object. The problem with doing it on the object is
>>that
>> > the consumer may not be in java and so may not be able to reproduce
>>the
>> > partitioning. For example we currently use Object.hashCode which is a
>> > little sketchy. We would be better off doing a standardized hash
>>function
>> > on the key bytes. If we want to give the partitioner access to the
>> original
>> > java object then obviously we need to handle serialization behind our
>> api.
>> >
>> > Names
>> > I think good names are important. I would like to rename the following
>> > classes in the new client:
>> >   Message=>Record: Now that the message has both a message and a key
>>it
>> is
>> > more of a KeyedMessage. Another name for a KeyedMessage is a Record.
>> >   MessageSet=>Records: This isn't too important but nit pickers
>>complain
>> > that it is not technically a Set but rather a List or Sequence but
>> > MessageList sounds funny to me.
>> >
>> > The actual clients will not interact with these classes. They will
>> interact
>> > with a ProducerRecord and ConsumerRecord. The reason for having
>>different
>> > fields is because the different clients
>> > Proposed producer API:
>> > SendResponse r = producer.send(new ProducerRecord(topic, key, value))
>> >
>> > Protocol Definition
>> >
>> > Here is what I am thinking about protocol definition. I see a couple
>>of
>> > problems with what we are doing currently. First the protocol
>>definition
>> is
>> > spread throughout a bunch of custom java objects. The error reporting
>>in
>> > these object is really terrible because they don't record the field
>> names.
>> > Furthermore people keep adding business logic into the protocol
>>objects
>> > which is pretty nasty.
>> >
>> > I would like to move to having a single Protocol.java file that
>>defines
>> the
>> > protocol in a readable DSL. Here is what I am thinking:
>> >
>> >   public static Schema REQUEST_HEADER =
>> >
>> >     new Schema(new Field("api_key", INT16, "The id of the request
>> type."),
>> >
>> >                new Field("api_version", INT16, "The version of the
>> API."),
>> >
>> >                  new Field("correlation_id", INT32, "A user-supplied
>> > integer value that will be passed back with the response"),
>> >
>> >                  new Field("client_id", STRING, "A user specified
>> > identifier for the client making the request."));
>> >
>> > To parse one of these requests you would do
>> >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>> >    short apiKey = struct.get("api_key");
>> >
>> > Internally Struct is just an Object[] with one entry per field which
>>is
>> > populated from the schema. The mapping of name to array index is a
>>hash
>> > table lookup. We can optimize access for performance critical areas by
>> > allowing:
>> >    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); //
>>do
>> > this once to lookup the index of the field
>> >    ...
>> >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>> >    short apiKey = struct.get(apiKeyField); // now this is just an
>>array
>> > access
>> >
>> > One advantage of this is this level of indirection will make it really
>> easy
>> > for us to handle backwards compatability in a more principled way. The
>> > protocol file will actually contain ALL versions of the schema and we
>> will
>> > always use the appropriate version to read the request (as specified
>>in
>> the
>> > header).
>> >
>> > NIO layer
>> >
>> > The plan is to add a non-blocking multi-connection abstraction that
>>would
>> > be used by both clients.
>> >
>> > class Selector {
>> >   /* create a new connection and associate it with the given id */
>> >   public void connect(int id, InetSocketAddress address,
>> intsendBufferSize,
>> > int receiveBufferSize)
>> >   /* wakeup this selector if it is currently awaiting data */
>> >   public void wakeup()
>> >   /* user provides sends, recieves, and a timeout. this method will
>> > populate "completed" and "disconnects" lists. Method blocks for up to
>>the
>> > timeout waiting for data to read. */
>> >   public void poll(long timeout, List<Send> sends, List<Send>
>>completed,
>> > List<Receive> receives, List<Integer> disconnects)
>> > }
>> >
>> > The consumer and producer would then each define their own logic to
>> manage
>> > their set of in-flight requests.
>> >
>> > Producer Implementation
>> >
>> > There are a couple of interesting changes I think we can make to the
>> > producer implementation.
>> >
>> > We retain the single background "sender" thread.
>> >
>> > But we can remove the definition of sync vs async clients. We always
>> return
>> > a "future" response immediately. Both sync and async sends would go
>> through
>> > the buffering that we currently do for the async layer. This would
>>mean
>> > that even in sync mode while the event loop is doing network IO if
>>many
>> > requests accumulate they will be sent in a single batch. This
>>effectively
>> > acts as a kind of "group commit". So instead of having an "async" mode
>> that
>> > acts differently in some way you just have a max.delay time that
>>controls
>> > how long the client will linger waiting for more data to accumulate.
>> > max.delay=0 is equivalent to the current sync producer.
>> >
>> > I would also propose changing our buffering strategy. Currently we
>>queue
>> > unserialized requests in a BlockingQueue. This is not ideal as it is
>>very
>> > difficult to reason about the memory usage of this queue. One 5MB
>>message
>> > may be bigger than 10k small messages. I propose that (1) we change
>>our
>> > queuing strategy to queue per-partition and (2) we directly write the
>> > messages to the ByteBuffer which will eventually be sent and use that
>>as
>> > the "queue". The batch size should likewise be in bytes not in number
>>of
>> > messages.
>> >
>> > If you think about it our current queuing strategy doesn't really make
>> > sense any more now that we always load balance over brokers. You set a
>> > batch size of N and we do a request when we have N messages in queue
>>but
>> > this says nothing about the size of the requests that will be sent.
>>You
>> > might end up sending all N messages to one server or you might end up
>> > sending 1 message to N different servers (totally defeating the
>>purpose
>> of
>> > batching).
>> >
>> > There are two granularities of batching that could make sense: the
>>broker
>> > level or the partition level. We do the send requests at the broker
>>level
>> > but we do the disk IO at the partition level. I propose making the
>>queues
>> > per-partition rather than per broker to avoid having to reshuffle the
>> > contents of queues when leadership changes. This could be debated,
>> though.
>> >
>> > If you actually look at the byte path of the producer this approach
>> allows
>> > cleaning a ton of stuff up. We can do in-pace writes to the
>>destination
>> > buffer that we will eventually send. This does mean moving
>>serialization
>> > and compression to the user thread. But I think this is good as these
>>may
>> > be slow but aren't unpredictably slow.
>> >
>> > The per-partition queues are thus implemented with a bunch of
>> pre-allocated
>> > ByteBuffers sized to max.batch.size, when the buffer is full or the
>>delay
>> > time elapses that buffer is sent.
>> >
>> > By doing this we could actually just reuse the buffers when the send
>>is
>> > complete. This would be nice because since the buffers are used for
>> > allocation they will likely fall out of young gen.
>> >
>> > A question to think about is how we want to bound memory usage. I
>>think
>> > what we want is the max.batch.size which controls the size of the
>> > individual buffers and total.buffer.memory which controls the total
>> memory
>> > used by all buffers. One problem with this is that there is the
>> possibility
>> > of some fragmentation. I.e. image a situation with 5k partitions being
>> > produced to, each getting a low but steady message rate. Giving each
>>of
>> > these a 1MB buffer would require 5GB of buffer space to have a buffer
>>for
>> > each partition. I'm not sure how bad this is since at least the memory
>> > usage is predictable and the reality is that holding thousands of java
>> > objects has huge overhead compared to contiguous byte arrays.
>> >
>> > -Jay
>> >
>>


Re: Client improvement discussion

Posted by Xavier Stevens <xa...@gaikai.com>.
+1 to making the API use bytes and push serialization into the client. This
is effectively what I am doing currently anyway. I implemented a generic
Encoder<ByteString> which just passes the bytes through.

I also like the idea of the client being written in pure Java. Interacting
with Scala code from Java isn't nearly as nice as the other way around.

Just my 2 cents.

-Xavier



On Fri, Jul 26, 2013 at 2:46 PM, Jason Rosenberg <jb...@squareup.com> wrote:

> Jay,
>
> This seems like a great direction.  Simplifying the consumer client would
> be a big win, and +1 for more native java client integration.
>
> On the last point, regarding memory usage for buffering per partition.  I
> would think it could be possible to devise a dynamic queuing system, to
> allow higher volume partitions to have larger effective buffers than
> smaller, low-volume partitions.  Thus, if you reserve a fixed
> total.buffer.memory, you could allocate units of buffer space which could
> then be composed to make larger buffers (perhaps not necessarily
> contiguous).  The long-tail of low-volume partitions could also be moved to
> some sort of auxiliary, non-collated buffer space, as they are less likely
> to benefit from contiguous buffering anyway.
>
> Fun stuff.
>
> Jason
>
> Jason
>
>
> On Fri, Jul 26, 2013 at 3:00 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > I sent around a wiki a few weeks back proposing a set of client
> > improvements that essentially amount to a rewrite of the producer and
> > consumer java clients.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> >
> > The below discussion assumes you have read this wiki.
> >
> > I started to do a little prototyping for the producer and wanted to share
> > some of the ideas that came up to get early feedback.
> >
> > First, a few simple but perhaps controversial things to discuss.
> >
> > Rollout
> > Phase 1: We add the new clients. No change on the server. Old clients
> still
> > exist. The new clients will be entirely in a new package so there will be
> > no possibility of name collision.
> > Phase 2: We swap out all shared code on the server to use the new client
> > stuff. At this point the old clients still exist but are essentially
> > deprecated.
> > Phase 3: We remove the old client code.
> >
> > Java
> > I think we should do the clients in java. Making our users deal with
> > scala's non-compatability issues and crazy stack traces causes people a
> lot
> > of pain. Furthermore we end up having to wrap everything now to get a
> > usable java api anyway for non-scala people. This does mean maintaining a
> > substantial chunk of java code, which is maybe less fun than scala. But
> > basically i think we should optimize for the end user and produce a
> > standalone pure-java jar with no dependencies.
> >
> > Jars
> > We definitely want to separate out the client jar. There is also a fair
> > amount of code shared between both (exceptions, protocol definition,
> utils,
> > and the message set implementation). Two approaches.
> > Two jar approach: split kafka.jar into kafka-clients.jar and
> > kafka-server.jar with the server depending on the clients. The advantage
> of
> > this is that it is simple. The disadvantage is that things like utils and
> > protocol definition will be in the client jar though technical they
> belong
> > equally to the server.
> > Many jar approach: split kafka.jar into kafka-common.jar,
> > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> > kafka-server.jar. The disadvantage of this is that the user needs two
> jars
> > (common + something) which is for sure going to confuse people. I also
> > think this will tend to spawn more jars over time.
> >
> > Background threads
> > I am thinking of moving both serialization and compression out of the
> > background send thread. I will explain a little about this idea below.
> >
> > Serialization
> > I am not sure if we should handle serialization in the client at all.
> > Basically I wonder if our own API wouldn't just be a lot simpler if we
> took
> > a byte[] key and byte[] value and let people serialize stuff themselves.
> > Injecting a class name for us to create the serializer is more roundabout
> > and has a lot of problems if the serializer itself requires a lot of
> > configuration or other objects to be instantiated.
> >
> > Partitioning
> > The real question with serialization is whether the partitioning should
> > happen on the java object or on the byte array key. The argument for
> doing
> > it on the java object is that it is easier to do something like a range
> > partition on the object. The problem with doing it on the object is that
> > the consumer may not be in java and so may not be able to reproduce the
> > partitioning. For example we currently use Object.hashCode which is a
> > little sketchy. We would be better off doing a standardized hash function
> > on the key bytes. If we want to give the partitioner access to the
> original
> > java object then obviously we need to handle serialization behind our
> api.
> >
> > Names
> > I think good names are important. I would like to rename the following
> > classes in the new client:
> >   Message=>Record: Now that the message has both a message and a key it
> is
> > more of a KeyedMessage. Another name for a KeyedMessage is a Record.
> >   MessageSet=>Records: This isn't too important but nit pickers complain
> > that it is not technically a Set but rather a List or Sequence but
> > MessageList sounds funny to me.
> >
> > The actual clients will not interact with these classes. They will
> interact
> > with a ProducerRecord and ConsumerRecord. The reason for having different
> > fields is because the different clients
> > Proposed producer API:
> > SendResponse r = producer.send(new ProducerRecord(topic, key, value))
> >
> > Protocol Definition
> >
> > Here is what I am thinking about protocol definition. I see a couple of
> > problems with what we are doing currently. First the protocol definition
> is
> > spread throughout a bunch of custom java objects. The error reporting in
> > these object is really terrible because they don't record the field
> names.
> > Furthermore people keep adding business logic into the protocol objects
> > which is pretty nasty.
> >
> > I would like to move to having a single Protocol.java file that defines
> the
> > protocol in a readable DSL. Here is what I am thinking:
> >
> >   public static Schema REQUEST_HEADER =
> >
> >     new Schema(new Field("api_key", INT16, "The id of the request
> type."),
> >
> >                new Field("api_version", INT16, "The version of the
> API."),
> >
> >                  new Field("correlation_id", INT32, "A user-supplied
> > integer value that will be passed back with the response"),
> >
> >                  new Field("client_id", STRING, "A user specified
> > identifier for the client making the request."));
> >
> > To parse one of these requests you would do
> >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> >    short apiKey = struct.get("api_key");
> >
> > Internally Struct is just an Object[] with one entry per field which is
> > populated from the schema. The mapping of name to array index is a hash
> > table lookup. We can optimize access for performance critical areas by
> > allowing:
> >    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> > this once to lookup the index of the field
> >    ...
> >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> >    short apiKey = struct.get(apiKeyField); // now this is just an array
> > access
> >
> > One advantage of this is this level of indirection will make it really
> easy
> > for us to handle backwards compatability in a more principled way. The
> > protocol file will actually contain ALL versions of the schema and we
> will
> > always use the appropriate version to read the request (as specified in
> the
> > header).
> >
> > NIO layer
> >
> > The plan is to add a non-blocking multi-connection abstraction that would
> > be used by both clients.
> >
> > class Selector {
> >   /* create a new connection and associate it with the given id */
> >   public void connect(int id, InetSocketAddress address,
> intsendBufferSize,
> > int receiveBufferSize)
> >   /* wakeup this selector if it is currently awaiting data */
> >   public void wakeup()
> >   /* user provides sends, recieves, and a timeout. this method will
> > populate "completed" and "disconnects" lists. Method blocks for up to the
> > timeout waiting for data to read. */
> >   public void poll(long timeout, List<Send> sends, List<Send> completed,
> > List<Receive> receives, List<Integer> disconnects)
> > }
> >
> > The consumer and producer would then each define their own logic to
> manage
> > their set of in-flight requests.
> >
> > Producer Implementation
> >
> > There are a couple of interesting changes I think we can make to the
> > producer implementation.
> >
> > We retain the single background "sender" thread.
> >
> > But we can remove the definition of sync vs async clients. We always
> return
> > a "future" response immediately. Both sync and async sends would go
> through
> > the buffering that we currently do for the async layer. This would mean
> > that even in sync mode while the event loop is doing network IO if many
> > requests accumulate they will be sent in a single batch. This effectively
> > acts as a kind of "group commit". So instead of having an "async" mode
> that
> > acts differently in some way you just have a max.delay time that controls
> > how long the client will linger waiting for more data to accumulate.
> > max.delay=0 is equivalent to the current sync producer.
> >
> > I would also propose changing our buffering strategy. Currently we queue
> > unserialized requests in a BlockingQueue. This is not ideal as it is very
> > difficult to reason about the memory usage of this queue. One 5MB message
> > may be bigger than 10k small messages. I propose that (1) we change our
> > queuing strategy to queue per-partition and (2) we directly write the
> > messages to the ByteBuffer which will eventually be sent and use that as
> > the "queue". The batch size should likewise be in bytes not in number of
> > messages.
> >
> > If you think about it our current queuing strategy doesn't really make
> > sense any more now that we always load balance over brokers. You set a
> > batch size of N and we do a request when we have N messages in queue but
> > this says nothing about the size of the requests that will be sent. You
> > might end up sending all N messages to one server or you might end up
> > sending 1 message to N different servers (totally defeating the purpose
> of
> > batching).
> >
> > There are two granularities of batching that could make sense: the broker
> > level or the partition level. We do the send requests at the broker level
> > but we do the disk IO at the partition level. I propose making the queues
> > per-partition rather than per broker to avoid having to reshuffle the
> > contents of queues when leadership changes. This could be debated,
> though.
> >
> > If you actually look at the byte path of the producer this approach
> allows
> > cleaning a ton of stuff up. We can do in-pace writes to the destination
> > buffer that we will eventually send. This does mean moving serialization
> > and compression to the user thread. But I think this is good as these may
> > be slow but aren't unpredictably slow.
> >
> > The per-partition queues are thus implemented with a bunch of
> pre-allocated
> > ByteBuffers sized to max.batch.size, when the buffer is full or the delay
> > time elapses that buffer is sent.
> >
> > By doing this we could actually just reuse the buffers when the send is
> > complete. This would be nice because since the buffers are used for
> > allocation they will likely fall out of young gen.
> >
> > A question to think about is how we want to bound memory usage. I think
> > what we want is the max.batch.size which controls the size of the
> > individual buffers and total.buffer.memory which controls the total
> memory
> > used by all buffers. One problem with this is that there is the
> possibility
> > of some fragmentation. I.e. image a situation with 5k partitions being
> > produced to, each getting a low but steady message rate. Giving each of
> > these a 1MB buffer would require 5GB of buffer space to have a buffer for
> > each partition. I'm not sure how bad this is since at least the memory
> > usage is predictable and the reality is that holding thousands of java
> > objects has huge overhead compared to contiguous byte arrays.
> >
> > -Jay
> >
>

Re: Client improvement discussion

Posted by Jason Rosenberg <jb...@squareup.com>.
Jay,

This seems like a great direction.  Simplifying the consumer client would
be a big win, and +1 for more native java client integration.

On the last point, regarding memory usage for buffering per partition.  I
would think it could be possible to devise a dynamic queuing system, to
allow higher volume partitions to have larger effective buffers than
smaller, low-volume partitions.  Thus, if you reserve a fixed
total.buffer.memory, you could allocate units of buffer space which could
then be composed to make larger buffers (perhaps not necessarily
contiguous).  The long-tail of low-volume partitions could also be moved to
some sort of auxiliary, non-collated buffer space, as they are less likely
to benefit from contiguous buffering anyway.

Fun stuff.

Jason

Jason


On Fri, Jul 26, 2013 at 3:00 PM, Jay Kreps <ja...@gmail.com> wrote:

> I sent around a wiki a few weeks back proposing a set of client
> improvements that essentially amount to a rewrite of the producer and
> consumer java clients.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>
> The below discussion assumes you have read this wiki.
>
> I started to do a little prototyping for the producer and wanted to share
> some of the ideas that came up to get early feedback.
>
> First, a few simple but perhaps controversial things to discuss.
>
> Rollout
> Phase 1: We add the new clients. No change on the server. Old clients still
> exist. The new clients will be entirely in a new package so there will be
> no possibility of name collision.
> Phase 2: We swap out all shared code on the server to use the new client
> stuff. At this point the old clients still exist but are essentially
> deprecated.
> Phase 3: We remove the old client code.
>
> Java
> I think we should do the clients in java. Making our users deal with
> scala's non-compatability issues and crazy stack traces causes people a lot
> of pain. Furthermore we end up having to wrap everything now to get a
> usable java api anyway for non-scala people. This does mean maintaining a
> substantial chunk of java code, which is maybe less fun than scala. But
> basically i think we should optimize for the end user and produce a
> standalone pure-java jar with no dependencies.
>
> Jars
> We definitely want to separate out the client jar. There is also a fair
> amount of code shared between both (exceptions, protocol definition, utils,
> and the message set implementation). Two approaches.
> Two jar approach: split kafka.jar into kafka-clients.jar and
> kafka-server.jar with the server depending on the clients. The advantage of
> this is that it is simple. The disadvantage is that things like utils and
> protocol definition will be in the client jar though technical they belong
> equally to the server.
> Many jar approach: split kafka.jar into kafka-common.jar,
> kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> kafka-server.jar. The disadvantage of this is that the user needs two jars
> (common + something) which is for sure going to confuse people. I also
> think this will tend to spawn more jars over time.
>
> Background threads
> I am thinking of moving both serialization and compression out of the
> background send thread. I will explain a little about this idea below.
>
> Serialization
> I am not sure if we should handle serialization in the client at all.
> Basically I wonder if our own API wouldn't just be a lot simpler if we took
> a byte[] key and byte[] value and let people serialize stuff themselves.
> Injecting a class name for us to create the serializer is more roundabout
> and has a lot of problems if the serializer itself requires a lot of
> configuration or other objects to be instantiated.
>
> Partitioning
> The real question with serialization is whether the partitioning should
> happen on the java object or on the byte array key. The argument for doing
> it on the java object is that it is easier to do something like a range
> partition on the object. The problem with doing it on the object is that
> the consumer may not be in java and so may not be able to reproduce the
> partitioning. For example we currently use Object.hashCode which is a
> little sketchy. We would be better off doing a standardized hash function
> on the key bytes. If we want to give the partitioner access to the original
> java object then obviously we need to handle serialization behind our api.
>
> Names
> I think good names are important. I would like to rename the following
> classes in the new client:
>   Message=>Record: Now that the message has both a message and a key it is
> more of a KeyedMessage. Another name for a KeyedMessage is a Record.
>   MessageSet=>Records: This isn't too important but nit pickers complain
> that it is not technically a Set but rather a List or Sequence but
> MessageList sounds funny to me.
>
> The actual clients will not interact with these classes. They will interact
> with a ProducerRecord and ConsumerRecord. The reason for having different
> fields is because the different clients
> Proposed producer API:
> SendResponse r = producer.send(new ProducerRecord(topic, key, value))
>
> Protocol Definition
>
> Here is what I am thinking about protocol definition. I see a couple of
> problems with what we are doing currently. First the protocol definition is
> spread throughout a bunch of custom java objects. The error reporting in
> these object is really terrible because they don't record the field names.
> Furthermore people keep adding business logic into the protocol objects
> which is pretty nasty.
>
> I would like to move to having a single Protocol.java file that defines the
> protocol in a readable DSL. Here is what I am thinking:
>
>   public static Schema REQUEST_HEADER =
>
>     new Schema(new Field("api_key", INT16, "The id of the request type."),
>
>                new Field("api_version", INT16, "The version of the API."),
>
>                  new Field("correlation_id", INT32, "A user-supplied
> integer value that will be passed back with the response"),
>
>                  new Field("client_id", STRING, "A user specified
> identifier for the client making the request."));
>
> To parse one of these requests you would do
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get("api_key");
>
> Internally Struct is just an Object[] with one entry per field which is
> populated from the schema. The mapping of name to array index is a hash
> table lookup. We can optimize access for performance critical areas by
> allowing:
>    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> this once to lookup the index of the field
>    ...
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get(apiKeyField); // now this is just an array
> access
>
> One advantage of this is this level of indirection will make it really easy
> for us to handle backwards compatability in a more principled way. The
> protocol file will actually contain ALL versions of the schema and we will
> always use the appropriate version to read the request (as specified in the
> header).
>
> NIO layer
>
> The plan is to add a non-blocking multi-connection abstraction that would
> be used by both clients.
>
> class Selector {
>   /* create a new connection and associate it with the given id */
>   public void connect(int id, InetSocketAddress address, intsendBufferSize,
> int receiveBufferSize)
>   /* wakeup this selector if it is currently awaiting data */
>   public void wakeup()
>   /* user provides sends, recieves, and a timeout. this method will
> populate "completed" and "disconnects" lists. Method blocks for up to the
> timeout waiting for data to read. */
>   public void poll(long timeout, List<Send> sends, List<Send> completed,
> List<Receive> receives, List<Integer> disconnects)
> }
>
> The consumer and producer would then each define their own logic to manage
> their set of in-flight requests.
>
> Producer Implementation
>
> There are a couple of interesting changes I think we can make to the
> producer implementation.
>
> We retain the single background "sender" thread.
>
> But we can remove the definition of sync vs async clients. We always return
> a "future" response immediately. Both sync and async sends would go through
> the buffering that we currently do for the async layer. This would mean
> that even in sync mode while the event loop is doing network IO if many
> requests accumulate they will be sent in a single batch. This effectively
> acts as a kind of "group commit". So instead of having an "async" mode that
> acts differently in some way you just have a max.delay time that controls
> how long the client will linger waiting for more data to accumulate.
> max.delay=0 is equivalent to the current sync producer.
>
> I would also propose changing our buffering strategy. Currently we queue
> unserialized requests in a BlockingQueue. This is not ideal as it is very
> difficult to reason about the memory usage of this queue. One 5MB message
> may be bigger than 10k small messages. I propose that (1) we change our
> queuing strategy to queue per-partition and (2) we directly write the
> messages to the ByteBuffer which will eventually be sent and use that as
> the "queue". The batch size should likewise be in bytes not in number of
> messages.
>
> If you think about it our current queuing strategy doesn't really make
> sense any more now that we always load balance over brokers. You set a
> batch size of N and we do a request when we have N messages in queue but
> this says nothing about the size of the requests that will be sent. You
> might end up sending all N messages to one server or you might end up
> sending 1 message to N different servers (totally defeating the purpose of
> batching).
>
> There are two granularities of batching that could make sense: the broker
> level or the partition level. We do the send requests at the broker level
> but we do the disk IO at the partition level. I propose making the queues
> per-partition rather than per broker to avoid having to reshuffle the
> contents of queues when leadership changes. This could be debated, though.
>
> If you actually look at the byte path of the producer this approach allows
> cleaning a ton of stuff up. We can do in-pace writes to the destination
> buffer that we will eventually send. This does mean moving serialization
> and compression to the user thread. But I think this is good as these may
> be slow but aren't unpredictably slow.
>
> The per-partition queues are thus implemented with a bunch of pre-allocated
> ByteBuffers sized to max.batch.size, when the buffer is full or the delay
> time elapses that buffer is sent.
>
> By doing this we could actually just reuse the buffers when the send is
> complete. This would be nice because since the buffers are used for
> allocation they will likely fall out of young gen.
>
> A question to think about is how we want to bound memory usage. I think
> what we want is the max.batch.size which controls the size of the
> individual buffers and total.buffer.memory which controls the total memory
> used by all buffers. One problem with this is that there is the possibility
> of some fragmentation. I.e. image a situation with 5k partitions being
> produced to, each getting a low but steady message rate. Giving each of
> these a 1MB buffer would require 5GB of buffer space to have a buffer for
> each partition. I'm not sure how bad this is since at least the memory
> usage is predictable and the reality is that holding thousands of java
> objects has huge overhead compared to contiguous byte arrays.
>
> -Jay
>

Re: Client improvement discussion

Posted by Tommy Messbauer <to...@vast.com>.
+1 to this.

Our company use Java, Python, Node.js for most of our interactions.  Our
data science team uses Python and a little Java.  Most of our producers /
consumers are node.js (for RabbitMQ).  Our services use mostly Java with
some node.js.  Our application layers use node.js.

So if our company is going to migrate to Kafka (which is why we have people
monitoring this list), then we definitely need a way to interact with Kafka
from non-Java languages.

-T




On Mon, Jul 29, 2013 at 8:03 AM, Sybrandy, Casey <
Casey.Sybrandy@six3systems.com> wrote:

> In the past there was some discussion about having a C client for non-JVM
> languages.  Is this still planned as well?  Being able to work with Kafka
> from other languages would be a great thing.  Where I work, we interact
> with Kafka via Java and Ruby (producer), so having an official C library
> that can be used from within Ruby would make it easier to have the same
> version of the client in Java and Ruby.
>
> -----Original Message-----
> From: Jay Kreps [mailto:jay.kreps@gmail.com]
> Sent: Friday, July 26, 2013 3:00 PM
> To: dev@kafka.apache.org; users@kafka.apache.org
> Subject: Client improvement discussion
>
> I sent around a wiki a few weeks back proposing a set of client
> improvements that essentially amount to a rewrite of the producer and
> consumer java clients.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>
> The below discussion assumes you have read this wiki.
>
> I started to do a little prototyping for the producer and wanted to share
> some of the ideas that came up to get early feedback.
>
> First, a few simple but perhaps controversial things to discuss.
>
> Rollout
> Phase 1: We add the new clients. No change on the server. Old clients
> still exist. The new clients will be entirely in a new package so there
> will be no possibility of name collision.
> Phase 2: We swap out all shared code on the server to use the new client
> stuff. At this point the old clients still exist but are essentially
> deprecated.
> Phase 3: We remove the old client code.
>
> Java
> I think we should do the clients in java. Making our users deal with
> scala's non-compatability issues and crazy stack traces causes people a lot
> of pain. Furthermore we end up having to wrap everything now to get a
> usable java api anyway for non-scala people. This does mean maintaining a
> substantial chunk of java code, which is maybe less fun than scala. But
> basically i think we should optimize for the end user and produce a
> standalone pure-java jar with no dependencies.
>
> Jars
> We definitely want to separate out the client jar. There is also a fair
> amount of code shared between both (exceptions, protocol definition, utils,
> and the message set implementation). Two approaches.
> Two jar approach: split kafka.jar into kafka-clients.jar and
> kafka-server.jar with the server depending on the clients. The advantage of
> this is that it is simple. The disadvantage is that things like utils and
> protocol definition will be in the client jar though technical they belong
> equally to the server.
> Many jar approach: split kafka.jar into kafka-common.jar,
> kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> kafka-server.jar. The disadvantage of this is that the user needs two jars
> (common + something) which is for sure going to confuse people. I also
> think this will tend to spawn more jars over time.
>
> Background threads
> I am thinking of moving both serialization and compression out of the
> background send thread. I will explain a little about this idea below.
>
> Serialization
> I am not sure if we should handle serialization in the client at all.
> Basically I wonder if our own API wouldn't just be a lot simpler if we
> took a byte[] key and byte[] value and let people serialize stuff
> themselves.
> Injecting a class name for us to create the serializer is more roundabout
> and has a lot of problems if the serializer itself requires a lot of
> configuration or other objects to be instantiated.
>
> Partitioning
> The real question with serialization is whether the partitioning should
> happen on the java object or on the byte array key. The argument for doing
> it on the java object is that it is easier to do something like a range
> partition on the object. The problem with doing it on the object is that
> the consumer may not be in java and so may not be able to reproduce the
> partitioning. For example we currently use Object.hashCode which is a
> little sketchy. We would be better off doing a standardized hash function
> on the key bytes. If we want to give the partitioner access to the original
> java object then obviously we need to handle serialization behind our api.
>
> Names
> I think good names are important. I would like to rename the following
> classes in the new client:
>   Message=>Record: Now that the message has both a message and a key it is
> more of a KeyedMessage. Another name for a KeyedMessage is a Record.
>   MessageSet=>Records: This isn't too important but nit pickers complain
> that it is not technically a Set but rather a List or Sequence but
> MessageList sounds funny to me.
>
> The actual clients will not interact with these classes. They will
> interact with a ProducerRecord and ConsumerRecord. The reason for having
> different fields is because the different clients Proposed producer API:
> SendResponse r = producer.send(new ProducerRecord(topic, key, value))
>
> Protocol Definition
>
> Here is what I am thinking about protocol definition. I see a couple of
> problems with what we are doing currently. First the protocol definition is
> spread throughout a bunch of custom java objects. The error reporting in
> these object is really terrible because they don't record the field names.
> Furthermore people keep adding business logic into the protocol objects
> which is pretty nasty.
>
> I would like to move to having a single Protocol.java file that defines
> the protocol in a readable DSL. Here is what I am thinking:
>
>   public static Schema REQUEST_HEADER =
>
>     new Schema(new Field("api_key", INT16, "The id of the request type."),
>
>                new Field("api_version", INT16, "The version of the API."),
>
>                  new Field("correlation_id", INT32, "A user-supplied
> integer value that will be passed back with the response"),
>
>                  new Field("client_id", STRING, "A user specified
> identifier for the client making the request."));
>
> To parse one of these requests you would do
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get("api_key");
>
> Internally Struct is just an Object[] with one entry per field which is
> populated from the schema. The mapping of name to array index is a hash
> table lookup. We can optimize access for performance critical areas by
> allowing:
>    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> this once to lookup the index of the field
>    ...
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get(apiKeyField); // now this is just an array
> access
>
> One advantage of this is this level of indirection will make it really
> easy for us to handle backwards compatability in a more principled way. The
> protocol file will actually contain ALL versions of the schema and we will
> always use the appropriate version to read the request (as specified in the
> header).
>
> NIO layer
>
> The plan is to add a non-blocking multi-connection abstraction that would
> be used by both clients.
>
> class Selector {
>   /* create a new connection and associate it with the given id */
>   public void connect(int id, InetSocketAddress address,
> intsendBufferSize, int receiveBufferSize)
>   /* wakeup this selector if it is currently awaiting data */
>   public void wakeup()
>   /* user provides sends, recieves, and a timeout. this method will
> populate "completed" and "disconnects" lists. Method blocks for up to the
> timeout waiting for data to read. */
>   public void poll(long timeout, List<Send> sends, List<Send> completed,
> List<Receive> receives, List<Integer> disconnects) }
>
> The consumer and producer would then each define their own logic to manage
> their set of in-flight requests.
>
> Producer Implementation
>
> There are a couple of interesting changes I think we can make to the
> producer implementation.
>
> We retain the single background "sender" thread.
>
> But we can remove the definition of sync vs async clients. We always
> return a "future" response immediately. Both sync and async sends would go
> through the buffering that we currently do for the async layer. This would
> mean that even in sync mode while the event loop is doing network IO if
> many requests accumulate they will be sent in a single batch. This
> effectively acts as a kind of "group commit". So instead of having an
> "async" mode that acts differently in some way you just have a max.delay
> time that controls how long the client will linger waiting for more data to
> accumulate.
> max.delay=0 is equivalent to the current sync producer.
>
> I would also propose changing our buffering strategy. Currently we queue
> unserialized requests in a BlockingQueue. This is not ideal as it is very
> difficult to reason about the memory usage of this queue. One 5MB message
> may be bigger than 10k small messages. I propose that (1) we change our
> queuing strategy to queue per-partition and (2) we directly write the
> messages to the ByteBuffer which will eventually be sent and use that as
> the "queue". The batch size should likewise be in bytes not in number of
> messages.
>
> If you think about it our current queuing strategy doesn't really make
> sense any more now that we always load balance over brokers. You set a
> batch size of N and we do a request when we have N messages in queue but
> this says nothing about the size of the requests that will be sent. You
> might end up sending all N messages to one server or you might end up
> sending 1 message to N different servers (totally defeating the purpose of
> batching).
>
> There are two granularities of batching that could make sense: the broker
> level or the partition level. We do the send requests at the broker level
> but we do the disk IO at the partition level. I propose making the queues
> per-partition rather than per broker to avoid having to reshuffle the
> contents of queues when leadership changes. This could be debated, though.
>
> If you actually look at the byte path of the producer this approach allows
> cleaning a ton of stuff up. We can do in-pace writes to the destination
> buffer that we will eventually send. This does mean moving serialization
> and compression to the user thread. But I think this is good as these may
> be slow but aren't unpredictably slow.
>
> The per-partition queues are thus implemented with a bunch of
> pre-allocated ByteBuffers sized to max.batch.size, when the buffer is full
> or the delay time elapses that buffer is sent.
>
> By doing this we could actually just reuse the buffers when the send is
> complete. This would be nice because since the buffers are used for
> allocation they will likely fall out of young gen.
>
> A question to think about is how we want to bound memory usage. I think
> what we want is the max.batch.size which controls the size of the
> individual buffers and total.buffer.memory which controls the total memory
> used by all buffers. One problem with this is that there is the possibility
> of some fragmentation. I.e. image a situation with 5k partitions being
> produced to, each getting a low but steady message rate. Giving each of
> these a 1MB buffer would require 5GB of buffer space to have a buffer for
> each partition. I'm not sure how bad this is since at least the memory
> usage is predictable and the reality is that holding thousands of java
> objects has huge overhead compared to contiguous byte arrays.
>
> -Jay
>



-- 
*Tommy Messbauer*
*Vast*
512-763-7646

Re: Client improvement discussion

Posted by Jay Kreps <ja...@gmail.com>.
I believe there are some open source C++ producer implementations. At
linkedin we have a C++ implementation. We would like to open source this if
there is interest. We would like to eventually include a C++ consumer as
well.

-Jay


On Mon, Jul 29, 2013 at 6:03 AM, Sybrandy, Casey <
Casey.Sybrandy@six3systems.com> wrote:

> In the past there was some discussion about having a C client for non-JVM
> languages.  Is this still planned as well?  Being able to work with Kafka
> from other languages would be a great thing.  Where I work, we interact
> with Kafka via Java and Ruby (producer), so having an official C library
> that can be used from within Ruby would make it easier to have the same
> version of the client in Java and Ruby.
>
> -----Original Message-----
> From: Jay Kreps [mailto:jay.kreps@gmail.com]
> Sent: Friday, July 26, 2013 3:00 PM
> To: dev@kafka.apache.org; users@kafka.apache.org
> Subject: Client improvement discussion
>
> I sent around a wiki a few weeks back proposing a set of client
> improvements that essentially amount to a rewrite of the producer and
> consumer java clients.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>
> The below discussion assumes you have read this wiki.
>
> I started to do a little prototyping for the producer and wanted to share
> some of the ideas that came up to get early feedback.
>
> First, a few simple but perhaps controversial things to discuss.
>
> Rollout
> Phase 1: We add the new clients. No change on the server. Old clients
> still exist. The new clients will be entirely in a new package so there
> will be no possibility of name collision.
> Phase 2: We swap out all shared code on the server to use the new client
> stuff. At this point the old clients still exist but are essentially
> deprecated.
> Phase 3: We remove the old client code.
>
> Java
> I think we should do the clients in java. Making our users deal with
> scala's non-compatability issues and crazy stack traces causes people a lot
> of pain. Furthermore we end up having to wrap everything now to get a
> usable java api anyway for non-scala people. This does mean maintaining a
> substantial chunk of java code, which is maybe less fun than scala. But
> basically i think we should optimize for the end user and produce a
> standalone pure-java jar with no dependencies.
>
> Jars
> We definitely want to separate out the client jar. There is also a fair
> amount of code shared between both (exceptions, protocol definition, utils,
> and the message set implementation). Two approaches.
> Two jar approach: split kafka.jar into kafka-clients.jar and
> kafka-server.jar with the server depending on the clients. The advantage of
> this is that it is simple. The disadvantage is that things like utils and
> protocol definition will be in the client jar though technical they belong
> equally to the server.
> Many jar approach: split kafka.jar into kafka-common.jar,
> kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> kafka-server.jar. The disadvantage of this is that the user needs two jars
> (common + something) which is for sure going to confuse people. I also
> think this will tend to spawn more jars over time.
>
> Background threads
> I am thinking of moving both serialization and compression out of the
> background send thread. I will explain a little about this idea below.
>
> Serialization
> I am not sure if we should handle serialization in the client at all.
> Basically I wonder if our own API wouldn't just be a lot simpler if we
> took a byte[] key and byte[] value and let people serialize stuff
> themselves.
> Injecting a class name for us to create the serializer is more roundabout
> and has a lot of problems if the serializer itself requires a lot of
> configuration or other objects to be instantiated.
>
> Partitioning
> The real question with serialization is whether the partitioning should
> happen on the java object or on the byte array key. The argument for doing
> it on the java object is that it is easier to do something like a range
> partition on the object. The problem with doing it on the object is that
> the consumer may not be in java and so may not be able to reproduce the
> partitioning. For example we currently use Object.hashCode which is a
> little sketchy. We would be better off doing a standardized hash function
> on the key bytes. If we want to give the partitioner access to the original
> java object then obviously we need to handle serialization behind our api.
>
> Names
> I think good names are important. I would like to rename the following
> classes in the new client:
>   Message=>Record: Now that the message has both a message and a key it is
> more of a KeyedMessage. Another name for a KeyedMessage is a Record.
>   MessageSet=>Records: This isn't too important but nit pickers complain
> that it is not technically a Set but rather a List or Sequence but
> MessageList sounds funny to me.
>
> The actual clients will not interact with these classes. They will
> interact with a ProducerRecord and ConsumerRecord. The reason for having
> different fields is because the different clients Proposed producer API:
> SendResponse r = producer.send(new ProducerRecord(topic, key, value))
>
> Protocol Definition
>
> Here is what I am thinking about protocol definition. I see a couple of
> problems with what we are doing currently. First the protocol definition is
> spread throughout a bunch of custom java objects. The error reporting in
> these object is really terrible because they don't record the field names.
> Furthermore people keep adding business logic into the protocol objects
> which is pretty nasty.
>
> I would like to move to having a single Protocol.java file that defines
> the protocol in a readable DSL. Here is what I am thinking:
>
>   public static Schema REQUEST_HEADER =
>
>     new Schema(new Field("api_key", INT16, "The id of the request type."),
>
>                new Field("api_version", INT16, "The version of the API."),
>
>                  new Field("correlation_id", INT32, "A user-supplied
> integer value that will be passed back with the response"),
>
>                  new Field("client_id", STRING, "A user specified
> identifier for the client making the request."));
>
> To parse one of these requests you would do
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get("api_key");
>
> Internally Struct is just an Object[] with one entry per field which is
> populated from the schema. The mapping of name to array index is a hash
> table lookup. We can optimize access for performance critical areas by
> allowing:
>    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> this once to lookup the index of the field
>    ...
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get(apiKeyField); // now this is just an array
> access
>
> One advantage of this is this level of indirection will make it really
> easy for us to handle backwards compatability in a more principled way. The
> protocol file will actually contain ALL versions of the schema and we will
> always use the appropriate version to read the request (as specified in the
> header).
>
> NIO layer
>
> The plan is to add a non-blocking multi-connection abstraction that would
> be used by both clients.
>
> class Selector {
>   /* create a new connection and associate it with the given id */
>   public void connect(int id, InetSocketAddress address,
> intsendBufferSize, int receiveBufferSize)
>   /* wakeup this selector if it is currently awaiting data */
>   public void wakeup()
>   /* user provides sends, recieves, and a timeout. this method will
> populate "completed" and "disconnects" lists. Method blocks for up to the
> timeout waiting for data to read. */
>   public void poll(long timeout, List<Send> sends, List<Send> completed,
> List<Receive> receives, List<Integer> disconnects) }
>
> The consumer and producer would then each define their own logic to manage
> their set of in-flight requests.
>
> Producer Implementation
>
> There are a couple of interesting changes I think we can make to the
> producer implementation.
>
> We retain the single background "sender" thread.
>
> But we can remove the definition of sync vs async clients. We always
> return a "future" response immediately. Both sync and async sends would go
> through the buffering that we currently do for the async layer. This would
> mean that even in sync mode while the event loop is doing network IO if
> many requests accumulate they will be sent in a single batch. This
> effectively acts as a kind of "group commit". So instead of having an
> "async" mode that acts differently in some way you just have a max.delay
> time that controls how long the client will linger waiting for more data to
> accumulate.
> max.delay=0 is equivalent to the current sync producer.
>
> I would also propose changing our buffering strategy. Currently we queue
> unserialized requests in a BlockingQueue. This is not ideal as it is very
> difficult to reason about the memory usage of this queue. One 5MB message
> may be bigger than 10k small messages. I propose that (1) we change our
> queuing strategy to queue per-partition and (2) we directly write the
> messages to the ByteBuffer which will eventually be sent and use that as
> the "queue". The batch size should likewise be in bytes not in number of
> messages.
>
> If you think about it our current queuing strategy doesn't really make
> sense any more now that we always load balance over brokers. You set a
> batch size of N and we do a request when we have N messages in queue but
> this says nothing about the size of the requests that will be sent. You
> might end up sending all N messages to one server or you might end up
> sending 1 message to N different servers (totally defeating the purpose of
> batching).
>
> There are two granularities of batching that could make sense: the broker
> level or the partition level. We do the send requests at the broker level
> but we do the disk IO at the partition level. I propose making the queues
> per-partition rather than per broker to avoid having to reshuffle the
> contents of queues when leadership changes. This could be debated, though.
>
> If you actually look at the byte path of the producer this approach allows
> cleaning a ton of stuff up. We can do in-pace writes to the destination
> buffer that we will eventually send. This does mean moving serialization
> and compression to the user thread. But I think this is good as these may
> be slow but aren't unpredictably slow.
>
> The per-partition queues are thus implemented with a bunch of
> pre-allocated ByteBuffers sized to max.batch.size, when the buffer is full
> or the delay time elapses that buffer is sent.
>
> By doing this we could actually just reuse the buffers when the send is
> complete. This would be nice because since the buffers are used for
> allocation they will likely fall out of young gen.
>
> A question to think about is how we want to bound memory usage. I think
> what we want is the max.batch.size which controls the size of the
> individual buffers and total.buffer.memory which controls the total memory
> used by all buffers. One problem with this is that there is the possibility
> of some fragmentation. I.e. image a situation with 5k partitions being
> produced to, each getting a low but steady message rate. Giving each of
> these a 1MB buffer would require 5GB of buffer space to have a buffer for
> each partition. I'm not sure how bad this is since at least the memory
> usage is predictable and the reality is that holding thousands of java
> objects has huge overhead compared to contiguous byte arrays.
>
> -Jay
>

Re: Client improvement discussion

Posted by Jay Kreps <ja...@gmail.com>.
I believe there are some open source C++ producer implementations. At
linkedin we have a C++ implementation. We would like to open source this if
there is interest. We would like to eventually include a C++ consumer as
well.

-Jay


On Mon, Jul 29, 2013 at 6:03 AM, Sybrandy, Casey <
Casey.Sybrandy@six3systems.com> wrote:

> In the past there was some discussion about having a C client for non-JVM
> languages.  Is this still planned as well?  Being able to work with Kafka
> from other languages would be a great thing.  Where I work, we interact
> with Kafka via Java and Ruby (producer), so having an official C library
> that can be used from within Ruby would make it easier to have the same
> version of the client in Java and Ruby.
>
> -----Original Message-----
> From: Jay Kreps [mailto:jay.kreps@gmail.com]
> Sent: Friday, July 26, 2013 3:00 PM
> To: dev@kafka.apache.org; users@kafka.apache.org
> Subject: Client improvement discussion
>
> I sent around a wiki a few weeks back proposing a set of client
> improvements that essentially amount to a rewrite of the producer and
> consumer java clients.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>
> The below discussion assumes you have read this wiki.
>
> I started to do a little prototyping for the producer and wanted to share
> some of the ideas that came up to get early feedback.
>
> First, a few simple but perhaps controversial things to discuss.
>
> Rollout
> Phase 1: We add the new clients. No change on the server. Old clients
> still exist. The new clients will be entirely in a new package so there
> will be no possibility of name collision.
> Phase 2: We swap out all shared code on the server to use the new client
> stuff. At this point the old clients still exist but are essentially
> deprecated.
> Phase 3: We remove the old client code.
>
> Java
> I think we should do the clients in java. Making our users deal with
> scala's non-compatability issues and crazy stack traces causes people a lot
> of pain. Furthermore we end up having to wrap everything now to get a
> usable java api anyway for non-scala people. This does mean maintaining a
> substantial chunk of java code, which is maybe less fun than scala. But
> basically i think we should optimize for the end user and produce a
> standalone pure-java jar with no dependencies.
>
> Jars
> We definitely want to separate out the client jar. There is also a fair
> amount of code shared between both (exceptions, protocol definition, utils,
> and the message set implementation). Two approaches.
> Two jar approach: split kafka.jar into kafka-clients.jar and
> kafka-server.jar with the server depending on the clients. The advantage of
> this is that it is simple. The disadvantage is that things like utils and
> protocol definition will be in the client jar though technical they belong
> equally to the server.
> Many jar approach: split kafka.jar into kafka-common.jar,
> kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> kafka-server.jar. The disadvantage of this is that the user needs two jars
> (common + something) which is for sure going to confuse people. I also
> think this will tend to spawn more jars over time.
>
> Background threads
> I am thinking of moving both serialization and compression out of the
> background send thread. I will explain a little about this idea below.
>
> Serialization
> I am not sure if we should handle serialization in the client at all.
> Basically I wonder if our own API wouldn't just be a lot simpler if we
> took a byte[] key and byte[] value and let people serialize stuff
> themselves.
> Injecting a class name for us to create the serializer is more roundabout
> and has a lot of problems if the serializer itself requires a lot of
> configuration or other objects to be instantiated.
>
> Partitioning
> The real question with serialization is whether the partitioning should
> happen on the java object or on the byte array key. The argument for doing
> it on the java object is that it is easier to do something like a range
> partition on the object. The problem with doing it on the object is that
> the consumer may not be in java and so may not be able to reproduce the
> partitioning. For example we currently use Object.hashCode which is a
> little sketchy. We would be better off doing a standardized hash function
> on the key bytes. If we want to give the partitioner access to the original
> java object then obviously we need to handle serialization behind our api.
>
> Names
> I think good names are important. I would like to rename the following
> classes in the new client:
>   Message=>Record: Now that the message has both a message and a key it is
> more of a KeyedMessage. Another name for a KeyedMessage is a Record.
>   MessageSet=>Records: This isn't too important but nit pickers complain
> that it is not technically a Set but rather a List or Sequence but
> MessageList sounds funny to me.
>
> The actual clients will not interact with these classes. They will
> interact with a ProducerRecord and ConsumerRecord. The reason for having
> different fields is because the different clients Proposed producer API:
> SendResponse r = producer.send(new ProducerRecord(topic, key, value))
>
> Protocol Definition
>
> Here is what I am thinking about protocol definition. I see a couple of
> problems with what we are doing currently. First the protocol definition is
> spread throughout a bunch of custom java objects. The error reporting in
> these object is really terrible because they don't record the field names.
> Furthermore people keep adding business logic into the protocol objects
> which is pretty nasty.
>
> I would like to move to having a single Protocol.java file that defines
> the protocol in a readable DSL. Here is what I am thinking:
>
>   public static Schema REQUEST_HEADER =
>
>     new Schema(new Field("api_key", INT16, "The id of the request type."),
>
>                new Field("api_version", INT16, "The version of the API."),
>
>                  new Field("correlation_id", INT32, "A user-supplied
> integer value that will be passed back with the response"),
>
>                  new Field("client_id", STRING, "A user specified
> identifier for the client making the request."));
>
> To parse one of these requests you would do
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get("api_key");
>
> Internally Struct is just an Object[] with one entry per field which is
> populated from the schema. The mapping of name to array index is a hash
> table lookup. We can optimize access for performance critical areas by
> allowing:
>    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> this once to lookup the index of the field
>    ...
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get(apiKeyField); // now this is just an array
> access
>
> One advantage of this is this level of indirection will make it really
> easy for us to handle backwards compatability in a more principled way. The
> protocol file will actually contain ALL versions of the schema and we will
> always use the appropriate version to read the request (as specified in the
> header).
>
> NIO layer
>
> The plan is to add a non-blocking multi-connection abstraction that would
> be used by both clients.
>
> class Selector {
>   /* create a new connection and associate it with the given id */
>   public void connect(int id, InetSocketAddress address,
> intsendBufferSize, int receiveBufferSize)
>   /* wakeup this selector if it is currently awaiting data */
>   public void wakeup()
>   /* user provides sends, recieves, and a timeout. this method will
> populate "completed" and "disconnects" lists. Method blocks for up to the
> timeout waiting for data to read. */
>   public void poll(long timeout, List<Send> sends, List<Send> completed,
> List<Receive> receives, List<Integer> disconnects) }
>
> The consumer and producer would then each define their own logic to manage
> their set of in-flight requests.
>
> Producer Implementation
>
> There are a couple of interesting changes I think we can make to the
> producer implementation.
>
> We retain the single background "sender" thread.
>
> But we can remove the definition of sync vs async clients. We always
> return a "future" response immediately. Both sync and async sends would go
> through the buffering that we currently do for the async layer. This would
> mean that even in sync mode while the event loop is doing network IO if
> many requests accumulate they will be sent in a single batch. This
> effectively acts as a kind of "group commit". So instead of having an
> "async" mode that acts differently in some way you just have a max.delay
> time that controls how long the client will linger waiting for more data to
> accumulate.
> max.delay=0 is equivalent to the current sync producer.
>
> I would also propose changing our buffering strategy. Currently we queue
> unserialized requests in a BlockingQueue. This is not ideal as it is very
> difficult to reason about the memory usage of this queue. One 5MB message
> may be bigger than 10k small messages. I propose that (1) we change our
> queuing strategy to queue per-partition and (2) we directly write the
> messages to the ByteBuffer which will eventually be sent and use that as
> the "queue". The batch size should likewise be in bytes not in number of
> messages.
>
> If you think about it our current queuing strategy doesn't really make
> sense any more now that we always load balance over brokers. You set a
> batch size of N and we do a request when we have N messages in queue but
> this says nothing about the size of the requests that will be sent. You
> might end up sending all N messages to one server or you might end up
> sending 1 message to N different servers (totally defeating the purpose of
> batching).
>
> There are two granularities of batching that could make sense: the broker
> level or the partition level. We do the send requests at the broker level
> but we do the disk IO at the partition level. I propose making the queues
> per-partition rather than per broker to avoid having to reshuffle the
> contents of queues when leadership changes. This could be debated, though.
>
> If you actually look at the byte path of the producer this approach allows
> cleaning a ton of stuff up. We can do in-pace writes to the destination
> buffer that we will eventually send. This does mean moving serialization
> and compression to the user thread. But I think this is good as these may
> be slow but aren't unpredictably slow.
>
> The per-partition queues are thus implemented with a bunch of
> pre-allocated ByteBuffers sized to max.batch.size, when the buffer is full
> or the delay time elapses that buffer is sent.
>
> By doing this we could actually just reuse the buffers when the send is
> complete. This would be nice because since the buffers are used for
> allocation they will likely fall out of young gen.
>
> A question to think about is how we want to bound memory usage. I think
> what we want is the max.batch.size which controls the size of the
> individual buffers and total.buffer.memory which controls the total memory
> used by all buffers. One problem with this is that there is the possibility
> of some fragmentation. I.e. image a situation with 5k partitions being
> produced to, each getting a low but steady message rate. Giving each of
> these a 1MB buffer would require 5GB of buffer space to have a buffer for
> each partition. I'm not sure how bad this is since at least the memory
> usage is predictable and the reality is that holding thousands of java
> objects has huge overhead compared to contiguous byte arrays.
>
> -Jay
>

RE: Client improvement discussion

Posted by "Sybrandy, Casey" <Ca...@Six3Systems.com>.
In the past there was some discussion about having a C client for non-JVM languages.  Is this still planned as well?  Being able to work with Kafka from other languages would be a great thing.  Where I work, we interact with Kafka via Java and Ruby (producer), so having an official C library that can be used from within Ruby would make it easier to have the same version of the client in Java and Ruby.

-----Original Message-----
From: Jay Kreps [mailto:jay.kreps@gmail.com] 
Sent: Friday, July 26, 2013 3:00 PM
To: dev@kafka.apache.org; users@kafka.apache.org
Subject: Client improvement discussion

I sent around a wiki a few weeks back proposing a set of client improvements that essentially amount to a rewrite of the producer and consumer java clients.

https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite

The below discussion assumes you have read this wiki.

I started to do a little prototyping for the producer and wanted to share some of the ideas that came up to get early feedback.

First, a few simple but perhaps controversial things to discuss.

Rollout
Phase 1: We add the new clients. No change on the server. Old clients still exist. The new clients will be entirely in a new package so there will be no possibility of name collision.
Phase 2: We swap out all shared code on the server to use the new client stuff. At this point the old clients still exist but are essentially deprecated.
Phase 3: We remove the old client code.

Java
I think we should do the clients in java. Making our users deal with scala's non-compatability issues and crazy stack traces causes people a lot of pain. Furthermore we end up having to wrap everything now to get a usable java api anyway for non-scala people. This does mean maintaining a substantial chunk of java code, which is maybe less fun than scala. But basically i think we should optimize for the end user and produce a standalone pure-java jar with no dependencies.

Jars
We definitely want to separate out the client jar. There is also a fair amount of code shared between both (exceptions, protocol definition, utils, and the message set implementation). Two approaches.
Two jar approach: split kafka.jar into kafka-clients.jar and kafka-server.jar with the server depending on the clients. The advantage of this is that it is simple. The disadvantage is that things like utils and protocol definition will be in the client jar though technical they belong equally to the server.
Many jar approach: split kafka.jar into kafka-common.jar, kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and kafka-server.jar. The disadvantage of this is that the user needs two jars (common + something) which is for sure going to confuse people. I also think this will tend to spawn more jars over time.

Background threads
I am thinking of moving both serialization and compression out of the background send thread. I will explain a little about this idea below.

Serialization
I am not sure if we should handle serialization in the client at all.
Basically I wonder if our own API wouldn't just be a lot simpler if we took a byte[] key and byte[] value and let people serialize stuff themselves.
Injecting a class name for us to create the serializer is more roundabout and has a lot of problems if the serializer itself requires a lot of configuration or other objects to be instantiated.

Partitioning
The real question with serialization is whether the partitioning should happen on the java object or on the byte array key. The argument for doing it on the java object is that it is easier to do something like a range partition on the object. The problem with doing it on the object is that the consumer may not be in java and so may not be able to reproduce the partitioning. For example we currently use Object.hashCode which is a little sketchy. We would be better off doing a standardized hash function on the key bytes. If we want to give the partitioner access to the original java object then obviously we need to handle serialization behind our api.

Names
I think good names are important. I would like to rename the following classes in the new client:
  Message=>Record: Now that the message has both a message and a key it is more of a KeyedMessage. Another name for a KeyedMessage is a Record.
  MessageSet=>Records: This isn't too important but nit pickers complain that it is not technically a Set but rather a List or Sequence but MessageList sounds funny to me.

The actual clients will not interact with these classes. They will interact with a ProducerRecord and ConsumerRecord. The reason for having different fields is because the different clients Proposed producer API:
SendResponse r = producer.send(new ProducerRecord(topic, key, value))

Protocol Definition

Here is what I am thinking about protocol definition. I see a couple of problems with what we are doing currently. First the protocol definition is spread throughout a bunch of custom java objects. The error reporting in these object is really terrible because they don't record the field names.
Furthermore people keep adding business logic into the protocol objects which is pretty nasty.

I would like to move to having a single Protocol.java file that defines the protocol in a readable DSL. Here is what I am thinking:

  public static Schema REQUEST_HEADER =

    new Schema(new Field("api_key", INT16, "The id of the request type."),

               new Field("api_version", INT16, "The version of the API."),

                 new Field("correlation_id", INT32, "A user-supplied integer value that will be passed back with the response"),

                 new Field("client_id", STRING, "A user specified identifier for the client making the request."));

To parse one of these requests you would do
   Struct struct = REQUEST_HEADER.parse(bytebuffer);
   short apiKey = struct.get("api_key");

Internally Struct is just an Object[] with one entry per field which is populated from the schema. The mapping of name to array index is a hash table lookup. We can optimize access for performance critical areas by
allowing:
   static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do this once to lookup the index of the field
   ...
   Struct struct = REQUEST_HEADER.parse(bytebuffer);
   short apiKey = struct.get(apiKeyField); // now this is just an array access

One advantage of this is this level of indirection will make it really easy for us to handle backwards compatability in a more principled way. The protocol file will actually contain ALL versions of the schema and we will always use the appropriate version to read the request (as specified in the header).

NIO layer

The plan is to add a non-blocking multi-connection abstraction that would be used by both clients.

class Selector {
  /* create a new connection and associate it with the given id */
  public void connect(int id, InetSocketAddress address, intsendBufferSize, int receiveBufferSize)
  /* wakeup this selector if it is currently awaiting data */
  public void wakeup()
  /* user provides sends, recieves, and a timeout. this method will populate "completed" and "disconnects" lists. Method blocks for up to the timeout waiting for data to read. */
  public void poll(long timeout, List<Send> sends, List<Send> completed, List<Receive> receives, List<Integer> disconnects) }

The consumer and producer would then each define their own logic to manage their set of in-flight requests.

Producer Implementation

There are a couple of interesting changes I think we can make to the producer implementation.

We retain the single background "sender" thread.

But we can remove the definition of sync vs async clients. We always return a "future" response immediately. Both sync and async sends would go through the buffering that we currently do for the async layer. This would mean that even in sync mode while the event loop is doing network IO if many requests accumulate they will be sent in a single batch. This effectively acts as a kind of "group commit". So instead of having an "async" mode that acts differently in some way you just have a max.delay time that controls how long the client will linger waiting for more data to accumulate.
max.delay=0 is equivalent to the current sync producer.

I would also propose changing our buffering strategy. Currently we queue unserialized requests in a BlockingQueue. This is not ideal as it is very difficult to reason about the memory usage of this queue. One 5MB message may be bigger than 10k small messages. I propose that (1) we change our queuing strategy to queue per-partition and (2) we directly write the messages to the ByteBuffer which will eventually be sent and use that as the "queue". The batch size should likewise be in bytes not in number of messages.

If you think about it our current queuing strategy doesn't really make sense any more now that we always load balance over brokers. You set a batch size of N and we do a request when we have N messages in queue but this says nothing about the size of the requests that will be sent. You might end up sending all N messages to one server or you might end up sending 1 message to N different servers (totally defeating the purpose of batching).

There are two granularities of batching that could make sense: the broker level or the partition level. We do the send requests at the broker level but we do the disk IO at the partition level. I propose making the queues per-partition rather than per broker to avoid having to reshuffle the contents of queues when leadership changes. This could be debated, though.

If you actually look at the byte path of the producer this approach allows cleaning a ton of stuff up. We can do in-pace writes to the destination buffer that we will eventually send. This does mean moving serialization and compression to the user thread. But I think this is good as these may be slow but aren't unpredictably slow.

The per-partition queues are thus implemented with a bunch of pre-allocated ByteBuffers sized to max.batch.size, when the buffer is full or the delay time elapses that buffer is sent.

By doing this we could actually just reuse the buffers when the send is complete. This would be nice because since the buffers are used for allocation they will likely fall out of young gen.

A question to think about is how we want to bound memory usage. I think what we want is the max.batch.size which controls the size of the individual buffers and total.buffer.memory which controls the total memory used by all buffers. One problem with this is that there is the possibility of some fragmentation. I.e. image a situation with 5k partitions being produced to, each getting a low but steady message rate. Giving each of these a 1MB buffer would require 5GB of buffer space to have a buffer for each partition. I'm not sure how bad this is since at least the memory usage is predictable and the reality is that holding thousands of java objects has huge overhead compared to contiguous byte arrays.

-Jay

Re: Client improvement discussion

Posted by Chris Hogue <cs...@gmail.com>.
These sounds like great steps. A couple of votes and questions:

1.  Moving serialization out and basing it all off of byte[] for key and
payload makes sense. Echoing a response below, we've ended up doing that in
some cases anyway, and the others do a trivial transform to bytes with an
Encoder.

2. On the single producer thread, we're actually suffering a bit from this
in 0.8, but it's mostly because compression and the blocking send happen on
this thread. In 0.7 since there was a thread-per-broker, a nice side-effect
was that compression and the blocking could "go wide", at least to the
number of brokers. If compression is moved out and the sends are now
non-blocking then this sounds like a nice improvement.

3. The wiki talks about static partition assignment for consumers. Just
adding a vote for that as we're currently working through how to do that
ourselves with the 0.8 consumer.

4. I'm curious how compression would interact with the new ByteBuffer
buffering you've described. If I'm following correctly you've said that
rather than queueing objects you'd end up doing in-place writes to the
pre-allocated ByteBuffer. Presumably this means the compression has already
happened on the user thread. But if there's no batching/buffering except in
the ByteBuffer, is there somewhere that multiple messages will be
compressed together (since it should result in better compression)? Maybe
there's still batching before this and I read too much into it?

5. I don't know if this is quite the right place to discuss it, but since
the producer has some involvement I'll throw it out there. The un-compress,
assign offsets, re-compress that happens on the broker with the built-in
compression API is a significant bottleneck that we're really trying to
avoid. As noted in another thread, we saw a throughput increase on the
order of 3x when we pre-batched and compressed the payloads before sending
it to the producer with 0.8.

I've not looked very closely at the wire-protocol, but if there was a way
for it to support in-place offset assignment even for compressed messages
it would be a huge win. Short of that we're fine taking the batch/compress
responsibility into user code, but it would be nice to have a way to do
that while retaining the built-in partition selection (i.e. semantic
partitioning) and other functionality of the producer. The new design may
already be an improvement in this area since it would move some
responsibility to the user thread.

Not sure if that's clear, but as the interfaces take shape it may be easier
to see how that will work.

-Chris






On Fri, Jul 26, 2013 at 1:00 PM, Jay Kreps <ja...@gmail.com> wrote:

> I sent around a wiki a few weeks back proposing a set of client
> improvements that essentially amount to a rewrite of the producer and
> consumer java clients.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>
> The below discussion assumes you have read this wiki.
>
> I started to do a little prototyping for the producer and wanted to share
> some of the ideas that came up to get early feedback.
>
> First, a few simple but perhaps controversial things to discuss.
>
> Rollout
> Phase 1: We add the new clients. No change on the server. Old clients still
> exist. The new clients will be entirely in a new package so there will be
> no possibility of name collision.
> Phase 2: We swap out all shared code on the server to use the new client
> stuff. At this point the old clients still exist but are essentially
> deprecated.
> Phase 3: We remove the old client code.
>
> Java
> I think we should do the clients in java. Making our users deal with
> scala's non-compatability issues and crazy stack traces causes people a lot
> of pain. Furthermore we end up having to wrap everything now to get a
> usable java api anyway for non-scala people. This does mean maintaining a
> substantial chunk of java code, which is maybe less fun than scala. But
> basically i think we should optimize for the end user and produce a
> standalone pure-java jar with no dependencies.
>
> Jars
> We definitely want to separate out the client jar. There is also a fair
> amount of code shared between both (exceptions, protocol definition, utils,
> and the message set implementation). Two approaches.
> Two jar approach: split kafka.jar into kafka-clients.jar and
> kafka-server.jar with the server depending on the clients. The advantage of
> this is that it is simple. The disadvantage is that things like utils and
> protocol definition will be in the client jar though technical they belong
> equally to the server.
> Many jar approach: split kafka.jar into kafka-common.jar,
> kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> kafka-server.jar. The disadvantage of this is that the user needs two jars
> (common + something) which is for sure going to confuse people. I also
> think this will tend to spawn more jars over time.
>
> Background threads
> I am thinking of moving both serialization and compression out of the
> background send thread. I will explain a little about this idea below.
>
> Serialization
> I am not sure if we should handle serialization in the client at all.
> Basically I wonder if our own API wouldn't just be a lot simpler if we took
> a byte[] key and byte[] value and let people serialize stuff themselves.
> Injecting a class name for us to create the serializer is more roundabout
> and has a lot of problems if the serializer itself requires a lot of
> configuration or other objects to be instantiated.
>
> Partitioning
> The real question with serialization is whether the partitioning should
> happen on the java object or on the byte array key. The argument for doing
> it on the java object is that it is easier to do something like a range
> partition on the object. The problem with doing it on the object is that
> the consumer may not be in java and so may not be able to reproduce the
> partitioning. For example we currently use Object.hashCode which is a
> little sketchy. We would be better off doing a standardized hash function
> on the key bytes. If we want to give the partitioner access to the original
> java object then obviously we need to handle serialization behind our api.
>
> Names
> I think good names are important. I would like to rename the following
> classes in the new client:
>   Message=>Record: Now that the message has both a message and a key it is
> more of a KeyedMessage. Another name for a KeyedMessage is a Record.
>   MessageSet=>Records: This isn't too important but nit pickers complain
> that it is not technically a Set but rather a List or Sequence but
> MessageList sounds funny to me.
>
> The actual clients will not interact with these classes. They will interact
> with a ProducerRecord and ConsumerRecord. The reason for having different
> fields is because the different clients
> Proposed producer API:
> SendResponse r = producer.send(new ProducerRecord(topic, key, value))
>
> Protocol Definition
>
> Here is what I am thinking about protocol definition. I see a couple of
> problems with what we are doing currently. First the protocol definition is
> spread throughout a bunch of custom java objects. The error reporting in
> these object is really terrible because they don't record the field names.
> Furthermore people keep adding business logic into the protocol objects
> which is pretty nasty.
>
> I would like to move to having a single Protocol.java file that defines the
> protocol in a readable DSL. Here is what I am thinking:
>
>   public static Schema REQUEST_HEADER =
>
>     new Schema(new Field("api_key", INT16, "The id of the request type."),
>
>                new Field("api_version", INT16, "The version of the API."),
>
>                  new Field("correlation_id", INT32, "A user-supplied
> integer value that will be passed back with the response"),
>
>                  new Field("client_id", STRING, "A user specified
> identifier for the client making the request."));
>
> To parse one of these requests you would do
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get("api_key");
>
> Internally Struct is just an Object[] with one entry per field which is
> populated from the schema. The mapping of name to array index is a hash
> table lookup. We can optimize access for performance critical areas by
> allowing:
>    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> this once to lookup the index of the field
>    ...
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get(apiKeyField); // now this is just an array
> access
>
> One advantage of this is this level of indirection will make it really easy
> for us to handle backwards compatability in a more principled way. The
> protocol file will actually contain ALL versions of the schema and we will
> always use the appropriate version to read the request (as specified in the
> header).
>
> NIO layer
>
> The plan is to add a non-blocking multi-connection abstraction that would
> be used by both clients.
>
> class Selector {
>   /* create a new connection and associate it with the given id */
>   public void connect(int id, InetSocketAddress address, intsendBufferSize,
> int receiveBufferSize)
>   /* wakeup this selector if it is currently awaiting data */
>   public void wakeup()
>   /* user provides sends, recieves, and a timeout. this method will
> populate "completed" and "disconnects" lists. Method blocks for up to the
> timeout waiting for data to read. */
>   public void poll(long timeout, List<Send> sends, List<Send> completed,
> List<Receive> receives, List<Integer> disconnects)
> }
>
> The consumer and producer would then each define their own logic to manage
> their set of in-flight requests.
>
> Producer Implementation
>
> There are a couple of interesting changes I think we can make to the
> producer implementation.
>
> We retain the single background "sender" thread.
>
> But we can remove the definition of sync vs async clients. We always return
> a "future" response immediately. Both sync and async sends would go through
> the buffering that we currently do for the async layer. This would mean
> that even in sync mode while the event loop is doing network IO if many
> requests accumulate they will be sent in a single batch. This effectively
> acts as a kind of "group commit". So instead of having an "async" mode that
> acts differently in some way you just have a max.delay time that controls
> how long the client will linger waiting for more data to accumulate.
> max.delay=0 is equivalent to the current sync producer.
>
> I would also propose changing our buffering strategy. Currently we queue
> unserialized requests in a BlockingQueue. This is not ideal as it is very
> difficult to reason about the memory usage of this queue. One 5MB message
> may be bigger than 10k small messages. I propose that (1) we change our
> queuing strategy to queue per-partition and (2) we directly write the
> messages to the ByteBuffer which will eventually be sent and use that as
> the "queue". The batch size should likewise be in bytes not in number of
> messages.
>
> If you think about it our current queuing strategy doesn't really make
> sense any more now that we always load balance over brokers. You set a
> batch size of N and we do a request when we have N messages in queue but
> this says nothing about the size of the requests that will be sent. You
> might end up sending all N messages to one server or you might end up
> sending 1 message to N different servers (totally defeating the purpose of
> batching).
>
> There are two granularities of batching that could make sense: the broker
> level or the partition level. We do the send requests at the broker level
> but we do the disk IO at the partition level. I propose making the queues
> per-partition rather than per broker to avoid having to reshuffle the
> contents of queues when leadership changes. This could be debated, though.
>
> If you actually look at the byte path of the producer this approach allows
> cleaning a ton of stuff up. We can do in-pace writes to the destination
> buffer that we will eventually send. This does mean moving serialization
> and compression to the user thread. But I think this is good as these may
> be slow but aren't unpredictably slow.
>
> The per-partition queues are thus implemented with a bunch of pre-allocated
> ByteBuffers sized to max.batch.size, when the buffer is full or the delay
> time elapses that buffer is sent.
>
> By doing this we could actually just reuse the buffers when the send is
> complete. This would be nice because since the buffers are used for
> allocation they will likely fall out of young gen.
>
> A question to think about is how we want to bound memory usage. I think
> what we want is the max.batch.size which controls the size of the
> individual buffers and total.buffer.memory which controls the total memory
> used by all buffers. One problem with this is that there is the possibility
> of some fragmentation. I.e. image a situation with 5k partitions being
> produced to, each getting a low but steady message rate. Giving each of
> these a 1MB buffer would require 5GB of buffer space to have a buffer for
> each partition. I'm not sure how bad this is since at least the memory
> usage is predictable and the reality is that holding thousands of java
> objects has huge overhead compared to contiguous byte arrays.
>
> -Jay
>

RE: Client improvement discussion

Posted by "Sybrandy, Casey" <Ca...@Six3Systems.com>.
In the past there was some discussion about having a C client for non-JVM languages.  Is this still planned as well?  Being able to work with Kafka from other languages would be a great thing.  Where I work, we interact with Kafka via Java and Ruby (producer), so having an official C library that can be used from within Ruby would make it easier to have the same version of the client in Java and Ruby.

-----Original Message-----
From: Jay Kreps [mailto:jay.kreps@gmail.com] 
Sent: Friday, July 26, 2013 3:00 PM
To: dev@kafka.apache.org; users@kafka.apache.org
Subject: Client improvement discussion

I sent around a wiki a few weeks back proposing a set of client improvements that essentially amount to a rewrite of the producer and consumer java clients.

https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite

The below discussion assumes you have read this wiki.

I started to do a little prototyping for the producer and wanted to share some of the ideas that came up to get early feedback.

First, a few simple but perhaps controversial things to discuss.

Rollout
Phase 1: We add the new clients. No change on the server. Old clients still exist. The new clients will be entirely in a new package so there will be no possibility of name collision.
Phase 2: We swap out all shared code on the server to use the new client stuff. At this point the old clients still exist but are essentially deprecated.
Phase 3: We remove the old client code.

Java
I think we should do the clients in java. Making our users deal with scala's non-compatability issues and crazy stack traces causes people a lot of pain. Furthermore we end up having to wrap everything now to get a usable java api anyway for non-scala people. This does mean maintaining a substantial chunk of java code, which is maybe less fun than scala. But basically i think we should optimize for the end user and produce a standalone pure-java jar with no dependencies.

Jars
We definitely want to separate out the client jar. There is also a fair amount of code shared between both (exceptions, protocol definition, utils, and the message set implementation). Two approaches.
Two jar approach: split kafka.jar into kafka-clients.jar and kafka-server.jar with the server depending on the clients. The advantage of this is that it is simple. The disadvantage is that things like utils and protocol definition will be in the client jar though technical they belong equally to the server.
Many jar approach: split kafka.jar into kafka-common.jar, kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and kafka-server.jar. The disadvantage of this is that the user needs two jars (common + something) which is for sure going to confuse people. I also think this will tend to spawn more jars over time.

Background threads
I am thinking of moving both serialization and compression out of the background send thread. I will explain a little about this idea below.

Serialization
I am not sure if we should handle serialization in the client at all.
Basically I wonder if our own API wouldn't just be a lot simpler if we took a byte[] key and byte[] value and let people serialize stuff themselves.
Injecting a class name for us to create the serializer is more roundabout and has a lot of problems if the serializer itself requires a lot of configuration or other objects to be instantiated.

Partitioning
The real question with serialization is whether the partitioning should happen on the java object or on the byte array key. The argument for doing it on the java object is that it is easier to do something like a range partition on the object. The problem with doing it on the object is that the consumer may not be in java and so may not be able to reproduce the partitioning. For example we currently use Object.hashCode which is a little sketchy. We would be better off doing a standardized hash function on the key bytes. If we want to give the partitioner access to the original java object then obviously we need to handle serialization behind our api.

Names
I think good names are important. I would like to rename the following classes in the new client:
  Message=>Record: Now that the message has both a message and a key it is more of a KeyedMessage. Another name for a KeyedMessage is a Record.
  MessageSet=>Records: This isn't too important but nit pickers complain that it is not technically a Set but rather a List or Sequence but MessageList sounds funny to me.

The actual clients will not interact with these classes. They will interact with a ProducerRecord and ConsumerRecord. The reason for having different fields is because the different clients Proposed producer API:
SendResponse r = producer.send(new ProducerRecord(topic, key, value))

Protocol Definition

Here is what I am thinking about protocol definition. I see a couple of problems with what we are doing currently. First the protocol definition is spread throughout a bunch of custom java objects. The error reporting in these object is really terrible because they don't record the field names.
Furthermore people keep adding business logic into the protocol objects which is pretty nasty.

I would like to move to having a single Protocol.java file that defines the protocol in a readable DSL. Here is what I am thinking:

  public static Schema REQUEST_HEADER =

    new Schema(new Field("api_key", INT16, "The id of the request type."),

               new Field("api_version", INT16, "The version of the API."),

                 new Field("correlation_id", INT32, "A user-supplied integer value that will be passed back with the response"),

                 new Field("client_id", STRING, "A user specified identifier for the client making the request."));

To parse one of these requests you would do
   Struct struct = REQUEST_HEADER.parse(bytebuffer);
   short apiKey = struct.get("api_key");

Internally Struct is just an Object[] with one entry per field which is populated from the schema. The mapping of name to array index is a hash table lookup. We can optimize access for performance critical areas by
allowing:
   static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do this once to lookup the index of the field
   ...
   Struct struct = REQUEST_HEADER.parse(bytebuffer);
   short apiKey = struct.get(apiKeyField); // now this is just an array access

One advantage of this is this level of indirection will make it really easy for us to handle backwards compatability in a more principled way. The protocol file will actually contain ALL versions of the schema and we will always use the appropriate version to read the request (as specified in the header).

NIO layer

The plan is to add a non-blocking multi-connection abstraction that would be used by both clients.

class Selector {
  /* create a new connection and associate it with the given id */
  public void connect(int id, InetSocketAddress address, intsendBufferSize, int receiveBufferSize)
  /* wakeup this selector if it is currently awaiting data */
  public void wakeup()
  /* user provides sends, recieves, and a timeout. this method will populate "completed" and "disconnects" lists. Method blocks for up to the timeout waiting for data to read. */
  public void poll(long timeout, List<Send> sends, List<Send> completed, List<Receive> receives, List<Integer> disconnects) }

The consumer and producer would then each define their own logic to manage their set of in-flight requests.

Producer Implementation

There are a couple of interesting changes I think we can make to the producer implementation.

We retain the single background "sender" thread.

But we can remove the definition of sync vs async clients. We always return a "future" response immediately. Both sync and async sends would go through the buffering that we currently do for the async layer. This would mean that even in sync mode while the event loop is doing network IO if many requests accumulate they will be sent in a single batch. This effectively acts as a kind of "group commit". So instead of having an "async" mode that acts differently in some way you just have a max.delay time that controls how long the client will linger waiting for more data to accumulate.
max.delay=0 is equivalent to the current sync producer.

I would also propose changing our buffering strategy. Currently we queue unserialized requests in a BlockingQueue. This is not ideal as it is very difficult to reason about the memory usage of this queue. One 5MB message may be bigger than 10k small messages. I propose that (1) we change our queuing strategy to queue per-partition and (2) we directly write the messages to the ByteBuffer which will eventually be sent and use that as the "queue". The batch size should likewise be in bytes not in number of messages.

If you think about it our current queuing strategy doesn't really make sense any more now that we always load balance over brokers. You set a batch size of N and we do a request when we have N messages in queue but this says nothing about the size of the requests that will be sent. You might end up sending all N messages to one server or you might end up sending 1 message to N different servers (totally defeating the purpose of batching).

There are two granularities of batching that could make sense: the broker level or the partition level. We do the send requests at the broker level but we do the disk IO at the partition level. I propose making the queues per-partition rather than per broker to avoid having to reshuffle the contents of queues when leadership changes. This could be debated, though.

If you actually look at the byte path of the producer this approach allows cleaning a ton of stuff up. We can do in-pace writes to the destination buffer that we will eventually send. This does mean moving serialization and compression to the user thread. But I think this is good as these may be slow but aren't unpredictably slow.

The per-partition queues are thus implemented with a bunch of pre-allocated ByteBuffers sized to max.batch.size, when the buffer is full or the delay time elapses that buffer is sent.

By doing this we could actually just reuse the buffers when the send is complete. This would be nice because since the buffers are used for allocation they will likely fall out of young gen.

A question to think about is how we want to bound memory usage. I think what we want is the max.batch.size which controls the size of the individual buffers and total.buffer.memory which controls the total memory used by all buffers. One problem with this is that there is the possibility of some fragmentation. I.e. image a situation with 5k partitions being produced to, each getting a low but steady message rate. Giving each of these a 1MB buffer would require 5GB of buffer space to have a buffer for each partition. I'm not sure how bad this is since at least the memory usage is predictable and the reality is that holding thousands of java objects has huge overhead compared to contiguous byte arrays.

-Jay

Re: Client improvement discussion

Posted by Jason Rosenberg <jb...@squareup.com>.
Jay,

This seems like a great direction.  Simplifying the consumer client would
be a big win, and +1 for more native java client integration.

On the last point, regarding memory usage for buffering per partition.  I
would think it could be possible to devise a dynamic queuing system, to
allow higher volume partitions to have larger effective buffers than
smaller, low-volume partitions.  Thus, if you reserve a fixed
total.buffer.memory, you could allocate units of buffer space which could
then be composed to make larger buffers (perhaps not necessarily
contiguous).  The long-tail of low-volume partitions could also be moved to
some sort of auxiliary, non-collated buffer space, as they are less likely
to benefit from contiguous buffering anyway.

Fun stuff.

Jason

Jason


On Fri, Jul 26, 2013 at 3:00 PM, Jay Kreps <ja...@gmail.com> wrote:

> I sent around a wiki a few weeks back proposing a set of client
> improvements that essentially amount to a rewrite of the producer and
> consumer java clients.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>
> The below discussion assumes you have read this wiki.
>
> I started to do a little prototyping for the producer and wanted to share
> some of the ideas that came up to get early feedback.
>
> First, a few simple but perhaps controversial things to discuss.
>
> Rollout
> Phase 1: We add the new clients. No change on the server. Old clients still
> exist. The new clients will be entirely in a new package so there will be
> no possibility of name collision.
> Phase 2: We swap out all shared code on the server to use the new client
> stuff. At this point the old clients still exist but are essentially
> deprecated.
> Phase 3: We remove the old client code.
>
> Java
> I think we should do the clients in java. Making our users deal with
> scala's non-compatability issues and crazy stack traces causes people a lot
> of pain. Furthermore we end up having to wrap everything now to get a
> usable java api anyway for non-scala people. This does mean maintaining a
> substantial chunk of java code, which is maybe less fun than scala. But
> basically i think we should optimize for the end user and produce a
> standalone pure-java jar with no dependencies.
>
> Jars
> We definitely want to separate out the client jar. There is also a fair
> amount of code shared between both (exceptions, protocol definition, utils,
> and the message set implementation). Two approaches.
> Two jar approach: split kafka.jar into kafka-clients.jar and
> kafka-server.jar with the server depending on the clients. The advantage of
> this is that it is simple. The disadvantage is that things like utils and
> protocol definition will be in the client jar though technical they belong
> equally to the server.
> Many jar approach: split kafka.jar into kafka-common.jar,
> kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> kafka-server.jar. The disadvantage of this is that the user needs two jars
> (common + something) which is for sure going to confuse people. I also
> think this will tend to spawn more jars over time.
>
> Background threads
> I am thinking of moving both serialization and compression out of the
> background send thread. I will explain a little about this idea below.
>
> Serialization
> I am not sure if we should handle serialization in the client at all.
> Basically I wonder if our own API wouldn't just be a lot simpler if we took
> a byte[] key and byte[] value and let people serialize stuff themselves.
> Injecting a class name for us to create the serializer is more roundabout
> and has a lot of problems if the serializer itself requires a lot of
> configuration or other objects to be instantiated.
>
> Partitioning
> The real question with serialization is whether the partitioning should
> happen on the java object or on the byte array key. The argument for doing
> it on the java object is that it is easier to do something like a range
> partition on the object. The problem with doing it on the object is that
> the consumer may not be in java and so may not be able to reproduce the
> partitioning. For example we currently use Object.hashCode which is a
> little sketchy. We would be better off doing a standardized hash function
> on the key bytes. If we want to give the partitioner access to the original
> java object then obviously we need to handle serialization behind our api.
>
> Names
> I think good names are important. I would like to rename the following
> classes in the new client:
>   Message=>Record: Now that the message has both a message and a key it is
> more of a KeyedMessage. Another name for a KeyedMessage is a Record.
>   MessageSet=>Records: This isn't too important but nit pickers complain
> that it is not technically a Set but rather a List or Sequence but
> MessageList sounds funny to me.
>
> The actual clients will not interact with these classes. They will interact
> with a ProducerRecord and ConsumerRecord. The reason for having different
> fields is because the different clients
> Proposed producer API:
> SendResponse r = producer.send(new ProducerRecord(topic, key, value))
>
> Protocol Definition
>
> Here is what I am thinking about protocol definition. I see a couple of
> problems with what we are doing currently. First the protocol definition is
> spread throughout a bunch of custom java objects. The error reporting in
> these object is really terrible because they don't record the field names.
> Furthermore people keep adding business logic into the protocol objects
> which is pretty nasty.
>
> I would like to move to having a single Protocol.java file that defines the
> protocol in a readable DSL. Here is what I am thinking:
>
>   public static Schema REQUEST_HEADER =
>
>     new Schema(new Field("api_key", INT16, "The id of the request type."),
>
>                new Field("api_version", INT16, "The version of the API."),
>
>                  new Field("correlation_id", INT32, "A user-supplied
> integer value that will be passed back with the response"),
>
>                  new Field("client_id", STRING, "A user specified
> identifier for the client making the request."));
>
> To parse one of these requests you would do
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get("api_key");
>
> Internally Struct is just an Object[] with one entry per field which is
> populated from the schema. The mapping of name to array index is a hash
> table lookup. We can optimize access for performance critical areas by
> allowing:
>    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> this once to lookup the index of the field
>    ...
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get(apiKeyField); // now this is just an array
> access
>
> One advantage of this is this level of indirection will make it really easy
> for us to handle backwards compatability in a more principled way. The
> protocol file will actually contain ALL versions of the schema and we will
> always use the appropriate version to read the request (as specified in the
> header).
>
> NIO layer
>
> The plan is to add a non-blocking multi-connection abstraction that would
> be used by both clients.
>
> class Selector {
>   /* create a new connection and associate it with the given id */
>   public void connect(int id, InetSocketAddress address, intsendBufferSize,
> int receiveBufferSize)
>   /* wakeup this selector if it is currently awaiting data */
>   public void wakeup()
>   /* user provides sends, recieves, and a timeout. this method will
> populate "completed" and "disconnects" lists. Method blocks for up to the
> timeout waiting for data to read. */
>   public void poll(long timeout, List<Send> sends, List<Send> completed,
> List<Receive> receives, List<Integer> disconnects)
> }
>
> The consumer and producer would then each define their own logic to manage
> their set of in-flight requests.
>
> Producer Implementation
>
> There are a couple of interesting changes I think we can make to the
> producer implementation.
>
> We retain the single background "sender" thread.
>
> But we can remove the definition of sync vs async clients. We always return
> a "future" response immediately. Both sync and async sends would go through
> the buffering that we currently do for the async layer. This would mean
> that even in sync mode while the event loop is doing network IO if many
> requests accumulate they will be sent in a single batch. This effectively
> acts as a kind of "group commit". So instead of having an "async" mode that
> acts differently in some way you just have a max.delay time that controls
> how long the client will linger waiting for more data to accumulate.
> max.delay=0 is equivalent to the current sync producer.
>
> I would also propose changing our buffering strategy. Currently we queue
> unserialized requests in a BlockingQueue. This is not ideal as it is very
> difficult to reason about the memory usage of this queue. One 5MB message
> may be bigger than 10k small messages. I propose that (1) we change our
> queuing strategy to queue per-partition and (2) we directly write the
> messages to the ByteBuffer which will eventually be sent and use that as
> the "queue". The batch size should likewise be in bytes not in number of
> messages.
>
> If you think about it our current queuing strategy doesn't really make
> sense any more now that we always load balance over brokers. You set a
> batch size of N and we do a request when we have N messages in queue but
> this says nothing about the size of the requests that will be sent. You
> might end up sending all N messages to one server or you might end up
> sending 1 message to N different servers (totally defeating the purpose of
> batching).
>
> There are two granularities of batching that could make sense: the broker
> level or the partition level. We do the send requests at the broker level
> but we do the disk IO at the partition level. I propose making the queues
> per-partition rather than per broker to avoid having to reshuffle the
> contents of queues when leadership changes. This could be debated, though.
>
> If you actually look at the byte path of the producer this approach allows
> cleaning a ton of stuff up. We can do in-pace writes to the destination
> buffer that we will eventually send. This does mean moving serialization
> and compression to the user thread. But I think this is good as these may
> be slow but aren't unpredictably slow.
>
> The per-partition queues are thus implemented with a bunch of pre-allocated
> ByteBuffers sized to max.batch.size, when the buffer is full or the delay
> time elapses that buffer is sent.
>
> By doing this we could actually just reuse the buffers when the send is
> complete. This would be nice because since the buffers are used for
> allocation they will likely fall out of young gen.
>
> A question to think about is how we want to bound memory usage. I think
> what we want is the max.batch.size which controls the size of the
> individual buffers and total.buffer.memory which controls the total memory
> used by all buffers. One problem with this is that there is the possibility
> of some fragmentation. I.e. image a situation with 5k partitions being
> produced to, each getting a low but steady message rate. Giving each of
> these a 1MB buffer would require 5GB of buffer space to have a buffer for
> each partition. I'm not sure how bad this is since at least the memory
> usage is predictable and the reality is that holding thousands of java
> objects has huge overhead compared to contiguous byte arrays.
>
> -Jay
>