You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jay Kreps <ja...@gmail.com> on 2014/02/07 21:56:19 UTC

protocol definition

Okay this is the last discussion item for the new client code. :-)

Previously to define an api you would implement a request and response
scala object that read and wrote its own bytes. There were a few problems
with this:
1. The consistency of the protocol was very hard to maintain.
2. You ended up hand-coding size estimation which was very tedious and
error prone
3. Error messages wouldn't give any field name information you would just
get some kind of BufferUnderflowException with no information about what or
why. Fixing these were hard because each object would have to implement
this good error handling.
4. There wasn't good support for api versioning. We have an api version
that is included in the request, but there was no easy way to maintain both
the old format and the new format.
5. The header information was baked into each request and it was only
though great care that we could keep the header standard throughout the
requests.
6. The same class that defined the protocol was used throughout the code.
So what were intended to be dumb DTOs ended up getting loaded up with
domain logic. Invariably aspects of this representation would end up
leaking into the protocol.
7. It was very hard to figure out what the protocol was from the code since
the definition was embedded in byte munging code spread out over dozens of
files.

So that was definitely bad.

We considered moving to an off-the-shelf protocol definition language like
avro or protocol buffers. But prior experience with these is that they are
great for whipping together a quick service but for a stable protocol it is
actually better to define the protocol rather than specifying an
implementation like avro or protocol buffers. This is similar to what is
done with AMQP which I think does a fantastic job of providing a well
specified messaging protocol (that protocol is not suitable for the type of
system we are building, but their method of specifying it I think is very
good).

So the conclusion was to retain our BNF-specified protocol and instead
implement a simple library for implementing this protocol. This would have
the advantage of letting us retain our existing protocol and also to add a
few Kafka-specific optimizations. This library is just a helper utility for
implementing our protocol spec, the spec remains the source of truth.

I implemented this as part of the new client effort. I will describe how my
library works and the pattern I think we should use with it.

The code for defining the protocol is in
org.apache.kafka.common.protocol.types. Note that this is meant to be a
stand-alone library for serialization, it doesn't know anything about our
actual request and responses or even that the messages being defined will
be sent over a network. The definition of our protocol is defined in
org.apache.kafka.common.protocol.Protocol, this is just the protocol and is
decoupled from the network layer and everything else.

We define a set of types that match our protocol, namely:
- fixed length primitives: int8, int16, int32, int64
- variable-length primitives: string, bytes
- container types: arrayof, struct

You define a message using types. All types extend
org.apache.kafka.common.protocol.types.Type.java. Each type knows how to
read, write, validate, and estimate the size of a single java object type.
Here is the correspondence
  Type.INT8: java.lang.Byte
  Type.INT16: java.lang.Short
  Type.INT32: java.lang.Integer
  Type.INT32: java.lang.Long
  Type.STRING: java.lang.String
  Type.BYTES: java.nio.ByteBuffer
  ArrayOf: Object[]
  Schema: Struct
The correspondence here can be thought of as that between a class and an
object: the class specifies the layout of the object, the object is an
instantiation of that class with particular values. Each message is defined
by a Schema, which can be used to read and write a Struct. The schema
specifies the fields in the type, and the Struct is an "instantiation" of
those fields with actual values. A struct can be thought of as a special
purpose hashmap.

An example will make this more clear. Here is how you define the request
header schema:
   new Schema(new Field("api_key", INT16, "The id of the request type."),
              new Field("api_version", INT16, "The version of the API."),
              new Field("correlation_id", INT32, "documentation string"),
              new Field("client_id", STRING, "more documentation."));

So a request header is a message that consists of a short api key followed
by a short api version followed by a correlation id and client id.

Here is a more complex example, the producer response:

new Schema(new Field("responses", new ArrayOf(new Schema(new Field("topic",
STRING), new Field("partition_responses", new ArrayOf(new Schema(new Field(
"partition", INT32), new Field("error_code", INT16), new Field("base_offset"
, INT64))))))))

(indentation in email is tricky). Note that this has a schema which
contains an array of sub-records which in turn have a sub-array of records.
As this nesting gets more complicated it can get a bit hard to read, so you
can break it up using variables. An equivalent definition would be:

Schema partitionResponse = new Schema(new Field("partition", INT32),

                                      new Field("error_code", INT16),

                                      new Field("base_offset", INT64));

Schema topicResponse = new Schema(new Field("topic", STRING),

                                  new Field("partition_responses", new
ArrayOf(partitionResponse)));

Schema producerResposne = new Schema(new Field("responses", new
 ArrayOf(topicResponse)));

Note that this is exactly equivalent.

Okay once such a schema is defined you can write an object in the following
way:

Struct header = new Struct(headerSchema);

header.set("api_key", (short) 1);

header.set("api_version", (short), 0);

...

headerSchema.write(buffer, header);

And you can read an instance of a header by doing:

Struct header = headerSchema.read(buffer);

Short apiKey = (Short) header.get("api_key");

Field apiVersionField = header.field("api_version");

Short apiKey = header.get(apiVersionField);

Note the two different field access styles. Accessing a field by name has
the performance of a hash table lookup. However for performance critical
situations you can get the Field object that represents that entry in the
struct. Getting this field object takes a hash table lookup but once you
have it it will get that field out of any instance of that struct with the
performance of an array access. So this is useful in cases where you can
statically fetch all the fields and then use them on every request (and
assuming you need to optimize performance).

These raw structs are logic-free and act as the "DTO" for data that will be
sent over the network.

For the more complex requests and responses interacting with the raw struct
is not very pleasent. My recommendation is that we still maintain a java
object that is the "domain object" for the request and knows how to read
and write itself to the struct. This is what you would end up passing down
into KafkaApis. This will have all the convenience methods that people were
wanting to add to the protocol objects before. The downside of this is that
in some ways you define the request twice, but I think both of these layers
are actually needed and would evolve independently (the struct only when
the protocol changes and the domain object with the needs of the code that
use it). I haven't actually done this in the produce yet, in part because I
think to make these domain objects properly you need to use them on the
server side too which we aren't ready for yet. However I did add a version
of this for metadata on KAFKA-1238 here:

https://issues.apache.org/jira/secure/attachment/12627654/KAFKA-1238-v1.patch

Okay, it would be great to get feedback on this code and this general
approach to protocol definition. If everyone likes it then I am going to
consider all the discussion items for the new code wrapped up and move on
to the more detailed code review and testing.

-Jay

Re: protocol definition

Posted by Jun Rao <ju...@gmail.com>.
Just some minor comments.

1. Struct is not a type. So, it probably should be in the protocol package,
instead of protocol.types.

2. Is the comment in Struct.instance() outdated? It seems this is only used
for creating an instance of a Field of type struct.

3. When creating a Field in a schema, it's a bit weird that we first create
a field using
 Field(String name, Type type)
only to later recreate it in Schema using
 Field(int index, String name, Type type, String doc, Object defaultValue,
Schema schema)

An alternative way is sth like the following:
new Schema().addField("f1", Int).addField("f2", Long)

This would avoid the double creation of Field. Not sure if it's better.

4. With this approach, refactoring field names will be harder since there
won't be ide support.

Thanks,

Jun


On Fri, Feb 7, 2014 at 12:56 PM, Jay Kreps <ja...@gmail.com> wrote:

> Okay this is the last discussion item for the new client code. :-)
>
> Previously to define an api you would implement a request and response
> scala object that read and wrote its own bytes. There were a few problems
> with this:
> 1. The consistency of the protocol was very hard to maintain.
> 2. You ended up hand-coding size estimation which was very tedious and
> error prone
> 3. Error messages wouldn't give any field name information you would just
> get some kind of BufferUnderflowException with no information about what or
> why. Fixing these were hard because each object would have to implement
> this good error handling.
> 4. There wasn't good support for api versioning. We have an api version
> that is included in the request, but there was no easy way to maintain both
> the old format and the new format.
> 5. The header information was baked into each request and it was only
> though great care that we could keep the header standard throughout the
> requests.
> 6. The same class that defined the protocol was used throughout the code.
> So what were intended to be dumb DTOs ended up getting loaded up with
> domain logic. Invariably aspects of this representation would end up
> leaking into the protocol.
> 7. It was very hard to figure out what the protocol was from the code since
> the definition was embedded in byte munging code spread out over dozens of
> files.
>
> So that was definitely bad.
>
> We considered moving to an off-the-shelf protocol definition language like
> avro or protocol buffers. But prior experience with these is that they are
> great for whipping together a quick service but for a stable protocol it is
> actually better to define the protocol rather than specifying an
> implementation like avro or protocol buffers. This is similar to what is
> done with AMQP which I think does a fantastic job of providing a well
> specified messaging protocol (that protocol is not suitable for the type of
> system we are building, but their method of specifying it I think is very
> good).
>
> So the conclusion was to retain our BNF-specified protocol and instead
> implement a simple library for implementing this protocol. This would have
> the advantage of letting us retain our existing protocol and also to add a
> few Kafka-specific optimizations. This library is just a helper utility for
> implementing our protocol spec, the spec remains the source of truth.
>
> I implemented this as part of the new client effort. I will describe how my
> library works and the pattern I think we should use with it.
>
> The code for defining the protocol is in
> org.apache.kafka.common.protocol.types. Note that this is meant to be a
> stand-alone library for serialization, it doesn't know anything about our
> actual request and responses or even that the messages being defined will
> be sent over a network. The definition of our protocol is defined in
> org.apache.kafka.common.protocol.Protocol, this is just the protocol and is
> decoupled from the network layer and everything else.
>
> We define a set of types that match our protocol, namely:
> - fixed length primitives: int8, int16, int32, int64
> - variable-length primitives: string, bytes
> - container types: arrayof, struct
>
> You define a message using types. All types extend
> org.apache.kafka.common.protocol.types.Type.java. Each type knows how to
> read, write, validate, and estimate the size of a single java object type.
> Here is the correspondence
>   Type.INT8: java.lang.Byte
>   Type.INT16: java.lang.Short
>   Type.INT32: java.lang.Integer
>   Type.INT32: java.lang.Long
>   Type.STRING: java.lang.String
>   Type.BYTES: java.nio.ByteBuffer
>   ArrayOf: Object[]
>   Schema: Struct
> The correspondence here can be thought of as that between a class and an
> object: the class specifies the layout of the object, the object is an
> instantiation of that class with particular values. Each message is defined
> by a Schema, which can be used to read and write a Struct. The schema
> specifies the fields in the type, and the Struct is an "instantiation" of
> those fields with actual values. A struct can be thought of as a special
> purpose hashmap.
>
> An example will make this more clear. Here is how you define the request
> header schema:
>    new Schema(new Field("api_key", INT16, "The id of the request type."),
>               new Field("api_version", INT16, "The version of the API."),
>               new Field("correlation_id", INT32, "documentation string"),
>               new Field("client_id", STRING, "more documentation."));
>
> So a request header is a message that consists of a short api key followed
> by a short api version followed by a correlation id and client id.
>
> Here is a more complex example, the producer response:
>
> new Schema(new Field("responses", new ArrayOf(new Schema(new Field("topic",
> STRING), new Field("partition_responses", new ArrayOf(new Schema(new Field(
> "partition", INT32), new Field("error_code", INT16), new
> Field("base_offset"
> , INT64))))))))
>
> (indentation in email is tricky). Note that this has a schema which
> contains an array of sub-records which in turn have a sub-array of records.
> As this nesting gets more complicated it can get a bit hard to read, so you
> can break it up using variables. An equivalent definition would be:
>
> Schema partitionResponse = new Schema(new Field("partition", INT32),
>
>                                       new Field("error_code", INT16),
>
>                                       new Field("base_offset", INT64));
>
> Schema topicResponse = new Schema(new Field("topic", STRING),
>
>                                   new Field("partition_responses", new
> ArrayOf(partitionResponse)));
>
> Schema producerResposne = new Schema(new Field("responses", new
>  ArrayOf(topicResponse)));
>
> Note that this is exactly equivalent.
>
> Okay once such a schema is defined you can write an object in the following
> way:
>
> Struct header = new Struct(headerSchema);
>
> header.set("api_key", (short) 1);
>
> header.set("api_version", (short), 0);
>
> ...
>
> headerSchema.write(buffer, header);
>
> And you can read an instance of a header by doing:
>
> Struct header = headerSchema.read(buffer);
>
> Short apiKey = (Short) header.get("api_key");
>
> Field apiVersionField = header.field("api_version");
>
> Short apiKey = header.get(apiVersionField);
>
> Note the two different field access styles. Accessing a field by name has
> the performance of a hash table lookup. However for performance critical
> situations you can get the Field object that represents that entry in the
> struct. Getting this field object takes a hash table lookup but once you
> have it it will get that field out of any instance of that struct with the
> performance of an array access. So this is useful in cases where you can
> statically fetch all the fields and then use them on every request (and
> assuming you need to optimize performance).
>
> These raw structs are logic-free and act as the "DTO" for data that will be
> sent over the network.
>
> For the more complex requests and responses interacting with the raw struct
> is not very pleasent. My recommendation is that we still maintain a java
> object that is the "domain object" for the request and knows how to read
> and write itself to the struct. This is what you would end up passing down
> into KafkaApis. This will have all the convenience methods that people were
> wanting to add to the protocol objects before. The downside of this is that
> in some ways you define the request twice, but I think both of these layers
> are actually needed and would evolve independently (the struct only when
> the protocol changes and the domain object with the needs of the code that
> use it). I haven't actually done this in the produce yet, in part because I
> think to make these domain objects properly you need to use them on the
> server side too which we aren't ready for yet. However I did add a version
> of this for metadata on KAFKA-1238 here:
>
>
> https://issues.apache.org/jira/secure/attachment/12627654/KAFKA-1238-v1.patch
>
> Okay, it would be great to get feedback on this code and this general
> approach to protocol definition. If everyone likes it then I am going to
> consider all the discussion items for the new code wrapped up and move on
> to the more detailed code review and testing.
>
> -Jay
>

Re: protocol definition

Posted by Guozhang Wang <wa...@gmail.com>.
Hey Jay,

3,4. What I meant is it is that you still need to set/get the field values
by their field names:

// in toStruct
body.set("topics", topics.toArray());

// in toMetadataResponse
int nodeId = (Integer) broker.get("node_id");

And I was propose just the opposite of your understanding: after the
protocol file is defined, auto-generate the the toStruct and parseRequest
functions; if this cannot be done I would rather enforce programmers to
only using the field object references and not allowing field names, since
string values may be used for other fields during a version change.

Guozhang





On Fri, Feb 7, 2014 at 3:23 PM, Jay Kreps <ja...@gmail.com> wrote:

> If what you are saying is that we should define the java code and kind of
> "automatically" generate the protocol off of that then I would be against
> that as I really want the protocol definition to be the source of truth. It
> should be 100% obvious when you are changing the protocol.
>
> -Jay
>
>
> On Fri, Feb 7, 2014 at 3:22 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Hey Guozhang,
> >
> > 1,2. Yes, you need to define the new format and add it to the list of
> > versions.
> > 3. Not necessarily. It depends on the change. If the formats are totally
> > different the code can take the version and just have some if statement
> to
> > read one or the other. But most changes are just the simple addition or
> > removal of fields. In this case the schema DSL supports default values
> that
> > will be filled in to help ease backwards compatability for simple cases.
> > 4. For the client presumably you always use the latest version so the
> > existing logic new
> Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA
> > .id)) should suffice?
> >
> > I'm not sure if I know what it is you are proposing. Care to elaborate?
> >
> > -Jay
> >
> >
> > On Fri, Feb 7, 2014 at 2:29 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> Just to clarify, in this case when we are evolving, say MetadataResponse
> >> to
> >> version 1, We will:
> >>
> >> 1. In Protocol.java, add another variable METADATA_RESPONSE_V1
> >>
> >> 2. In Protocol.java, change this line to:
> >>
> >> public static Schema[] METADATA_RESPONSE = new Schema[] {
> >> METADATA_RESPONSE_V1 };
> >>
> >> 3. In MetadataResponse.java, change the bytes-to-object function (i.e.,
> >> MetadataResponse(Struct struct)) to have both parsing logic for V0 and
> V1.
> >>
> >> 4. In MetadataResponse.java, change the object-to-bytes function (i.e.,
> >> Struct toStruct() ) to V1.
> >>
> >> Is this correct?
> >>
> >> My feeling is that it would be good if we do not have to get the field
> >> names like "topics" in multiple places: Protocol and MetadataResponse,
> for
> >> example, and although the toStruct() only needs to maintain the logic
> for
> >> the latest version, MetadataResponse(Struct struct) would probably needs
> >> to
> >> handle all versions, which will make it very complicated. So I am
> >> wondering
> >> can these two functions be provided automatically? I am not advocating
> for
> >> another round of compilation but would like to see if it is possible for
> >> these procedures be done in a more programmable way.
> >>
> >> Guozhang
> >>
> >>
> >>
> >> On Fri, Feb 7, 2014 at 12:56 PM, Jay Kreps <ja...@gmail.com> wrote:
> >>
> >> > Okay this is the last discussion item for the new client code. :-)
> >> >
> >> > Previously to define an api you would implement a request and response
> >> > scala object that read and wrote its own bytes. There were a few
> >> problems
> >> > with this:
> >> > 1. The consistency of the protocol was very hard to maintain.
> >> > 2. You ended up hand-coding size estimation which was very tedious and
> >> > error prone
> >> > 3. Error messages wouldn't give any field name information you would
> >> just
> >> > get some kind of BufferUnderflowException with no information about
> >> what or
> >> > why. Fixing these were hard because each object would have to
> implement
> >> > this good error handling.
> >> > 4. There wasn't good support for api versioning. We have an api
> version
> >> > that is included in the request, but there was no easy way to maintain
> >> both
> >> > the old format and the new format.
> >> > 5. The header information was baked into each request and it was only
> >> > though great care that we could keep the header standard throughout
> the
> >> > requests.
> >> > 6. The same class that defined the protocol was used throughout the
> >> code.
> >> > So what were intended to be dumb DTOs ended up getting loaded up with
> >> > domain logic. Invariably aspects of this representation would end up
> >> > leaking into the protocol.
> >> > 7. It was very hard to figure out what the protocol was from the code
> >> since
> >> > the definition was embedded in byte munging code spread out over
> dozens
> >> of
> >> > files.
> >> >
> >> > So that was definitely bad.
> >> >
> >> > We considered moving to an off-the-shelf protocol definition language
> >> like
> >> > avro or protocol buffers. But prior experience with these is that they
> >> are
> >> > great for whipping together a quick service but for a stable protocol
> >> it is
> >> > actually better to define the protocol rather than specifying an
> >> > implementation like avro or protocol buffers. This is similar to what
> is
> >> > done with AMQP which I think does a fantastic job of providing a well
> >> > specified messaging protocol (that protocol is not suitable for the
> >> type of
> >> > system we are building, but their method of specifying it I think is
> >> very
> >> > good).
> >> >
> >> > So the conclusion was to retain our BNF-specified protocol and instead
> >> > implement a simple library for implementing this protocol. This would
> >> have
> >> > the advantage of letting us retain our existing protocol and also to
> >> add a
> >> > few Kafka-specific optimizations. This library is just a helper
> utility
> >> for
> >> > implementing our protocol spec, the spec remains the source of truth.
> >> >
> >> > I implemented this as part of the new client effort. I will describe
> >> how my
> >> > library works and the pattern I think we should use with it.
> >> >
> >> > The code for defining the protocol is in
> >> > org.apache.kafka.common.protocol.types. Note that this is meant to be
> a
> >> > stand-alone library for serialization, it doesn't know anything about
> >> our
> >> > actual request and responses or even that the messages being defined
> >> will
> >> > be sent over a network. The definition of our protocol is defined in
> >> > org.apache.kafka.common.protocol.Protocol, this is just the protocol
> >> and is
> >> > decoupled from the network layer and everything else.
> >> >
> >> > We define a set of types that match our protocol, namely:
> >> > - fixed length primitives: int8, int16, int32, int64
> >> > - variable-length primitives: string, bytes
> >> > - container types: arrayof, struct
> >> >
> >> > You define a message using types. All types extend
> >> > org.apache.kafka.common.protocol.types.Type.java. Each type knows how
> to
> >> > read, write, validate, and estimate the size of a single java object
> >> type.
> >> > Here is the correspondence
> >> >   Type.INT8: java.lang.Byte
> >> >   Type.INT16: java.lang.Short
> >> >   Type.INT32: java.lang.Integer
> >> >   Type.INT32: java.lang.Long
> >> >   Type.STRING: java.lang.String
> >> >   Type.BYTES: java.nio.ByteBuffer
> >> >   ArrayOf: Object[]
> >> >   Schema: Struct
> >> > The correspondence here can be thought of as that between a class and
> an
> >> > object: the class specifies the layout of the object, the object is an
> >> > instantiation of that class with particular values. Each message is
> >> defined
> >> > by a Schema, which can be used to read and write a Struct. The schema
> >> > specifies the fields in the type, and the Struct is an "instantiation"
> >> of
> >> > those fields with actual values. A struct can be thought of as a
> special
> >> > purpose hashmap.
> >> >
> >> > An example will make this more clear. Here is how you define the
> request
> >> > header schema:
> >> >    new Schema(new Field("api_key", INT16, "The id of the request
> >> type."),
> >> >               new Field("api_version", INT16, "The version of the
> >> API."),
> >> >               new Field("correlation_id", INT32, "documentation
> >> string"),
> >> >               new Field("client_id", STRING, "more documentation."));
> >> >
> >> > So a request header is a message that consists of a short api key
> >> followed
> >> > by a short api version followed by a correlation id and client id.
> >> >
> >> > Here is a more complex example, the producer response:
> >> >
> >> > new Schema(new Field("responses", new ArrayOf(new Schema(new
> >> Field("topic",
> >> > STRING), new Field("partition_responses", new ArrayOf(new Schema(new
> >> Field(
> >> > "partition", INT32), new Field("error_code", INT16), new
> >> > Field("base_offset"
> >> > , INT64))))))))
> >> >
> >> > (indentation in email is tricky). Note that this has a schema which
> >> > contains an array of sub-records which in turn have a sub-array of
> >> records.
> >> > As this nesting gets more complicated it can get a bit hard to read,
> so
> >> you
> >> > can break it up using variables. An equivalent definition would be:
> >> >
> >> > Schema partitionResponse = new Schema(new Field("partition", INT32),
> >> >
> >> >                                       new Field("error_code", INT16),
> >> >
> >> >                                       new Field("base_offset",
> INT64));
> >> >
> >> > Schema topicResponse = new Schema(new Field("topic", STRING),
> >> >
> >> >                                   new Field("partition_responses", new
> >> > ArrayOf(partitionResponse)));
> >> >
> >> > Schema producerResposne = new Schema(new Field("responses", new
> >> >  ArrayOf(topicResponse)));
> >> >
> >> > Note that this is exactly equivalent.
> >> >
> >> > Okay once such a schema is defined you can write an object in the
> >> following
> >> > way:
> >> >
> >> > Struct header = new Struct(headerSchema);
> >> >
> >> > header.set("api_key", (short) 1);
> >> >
> >> > header.set("api_version", (short), 0);
> >> >
> >> > ...
> >> >
> >> > headerSchema.write(buffer, header);
> >> >
> >> > And you can read an instance of a header by doing:
> >> >
> >> > Struct header = headerSchema.read(buffer);
> >> >
> >> > Short apiKey = (Short) header.get("api_key");
> >> >
> >> > Field apiVersionField = header.field("api_version");
> >> >
> >> > Short apiKey = header.get(apiVersionField);
> >> >
> >> > Note the two different field access styles. Accessing a field by name
> >> has
> >> > the performance of a hash table lookup. However for performance
> critical
> >> > situations you can get the Field object that represents that entry in
> >> the
> >> > struct. Getting this field object takes a hash table lookup but once
> you
> >> > have it it will get that field out of any instance of that struct with
> >> the
> >> > performance of an array access. So this is useful in cases where you
> can
> >> > statically fetch all the fields and then use them on every request
> (and
> >> > assuming you need to optimize performance).
> >> >
> >> > These raw structs are logic-free and act as the "DTO" for data that
> >> will be
> >> > sent over the network.
> >> >
> >> > For the more complex requests and responses interacting with the raw
> >> struct
> >> > is not very pleasent. My recommendation is that we still maintain a
> java
> >> > object that is the "domain object" for the request and knows how to
> read
> >> > and write itself to the struct. This is what you would end up passing
> >> down
> >> > into KafkaApis. This will have all the convenience methods that people
> >> were
> >> > wanting to add to the protocol objects before. The downside of this is
> >> that
> >> > in some ways you define the request twice, but I think both of these
> >> layers
> >> > are actually needed and would evolve independently (the struct only
> when
> >> > the protocol changes and the domain object with the needs of the code
> >> that
> >> > use it). I haven't actually done this in the produce yet, in part
> >> because I
> >> > think to make these domain objects properly you need to use them on
> the
> >> > server side too which we aren't ready for yet. However I did add a
> >> version
> >> > of this for metadata on KAFKA-1238 here:
> >> >
> >> >
> >> >
> >>
> https://issues.apache.org/jira/secure/attachment/12627654/KAFKA-1238-v1.patch
> >> >
> >> > Okay, it would be great to get feedback on this code and this general
> >> > approach to protocol definition. If everyone likes it then I am going
> to
> >> > consider all the discussion items for the new code wrapped up and move
> >> on
> >> > to the more detailed code review and testing.
> >> >
> >> > -Jay
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Re: protocol definition

Posted by Jay Kreps <ja...@gmail.com>.
If what you are saying is that we should define the java code and kind of
"automatically" generate the protocol off of that then I would be against
that as I really want the protocol definition to be the source of truth. It
should be 100% obvious when you are changing the protocol.

-Jay


On Fri, Feb 7, 2014 at 3:22 PM, Jay Kreps <ja...@gmail.com> wrote:

> Hey Guozhang,
>
> 1,2. Yes, you need to define the new format and add it to the list of
> versions.
> 3. Not necessarily. It depends on the change. If the formats are totally
> different the code can take the version and just have some if statement to
> read one or the other. But most changes are just the simple addition or
> removal of fields. In this case the schema DSL supports default values that
> will be filled in to help ease backwards compatability for simple cases.
> 4. For the client presumably you always use the latest version so the
> existing logic new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA
> .id)) should suffice?
>
> I'm not sure if I know what it is you are proposing. Care to elaborate?
>
> -Jay
>
>
> On Fri, Feb 7, 2014 at 2:29 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
>> Just to clarify, in this case when we are evolving, say MetadataResponse
>> to
>> version 1, We will:
>>
>> 1. In Protocol.java, add another variable METADATA_RESPONSE_V1
>>
>> 2. In Protocol.java, change this line to:
>>
>> public static Schema[] METADATA_RESPONSE = new Schema[] {
>> METADATA_RESPONSE_V1 };
>>
>> 3. In MetadataResponse.java, change the bytes-to-object function (i.e.,
>> MetadataResponse(Struct struct)) to have both parsing logic for V0 and V1.
>>
>> 4. In MetadataResponse.java, change the object-to-bytes function (i.e.,
>> Struct toStruct() ) to V1.
>>
>> Is this correct?
>>
>> My feeling is that it would be good if we do not have to get the field
>> names like "topics" in multiple places: Protocol and MetadataResponse, for
>> example, and although the toStruct() only needs to maintain the logic for
>> the latest version, MetadataResponse(Struct struct) would probably needs
>> to
>> handle all versions, which will make it very complicated. So I am
>> wondering
>> can these two functions be provided automatically? I am not advocating for
>> another round of compilation but would like to see if it is possible for
>> these procedures be done in a more programmable way.
>>
>> Guozhang
>>
>>
>>
>> On Fri, Feb 7, 2014 at 12:56 PM, Jay Kreps <ja...@gmail.com> wrote:
>>
>> > Okay this is the last discussion item for the new client code. :-)
>> >
>> > Previously to define an api you would implement a request and response
>> > scala object that read and wrote its own bytes. There were a few
>> problems
>> > with this:
>> > 1. The consistency of the protocol was very hard to maintain.
>> > 2. You ended up hand-coding size estimation which was very tedious and
>> > error prone
>> > 3. Error messages wouldn't give any field name information you would
>> just
>> > get some kind of BufferUnderflowException with no information about
>> what or
>> > why. Fixing these were hard because each object would have to implement
>> > this good error handling.
>> > 4. There wasn't good support for api versioning. We have an api version
>> > that is included in the request, but there was no easy way to maintain
>> both
>> > the old format and the new format.
>> > 5. The header information was baked into each request and it was only
>> > though great care that we could keep the header standard throughout the
>> > requests.
>> > 6. The same class that defined the protocol was used throughout the
>> code.
>> > So what were intended to be dumb DTOs ended up getting loaded up with
>> > domain logic. Invariably aspects of this representation would end up
>> > leaking into the protocol.
>> > 7. It was very hard to figure out what the protocol was from the code
>> since
>> > the definition was embedded in byte munging code spread out over dozens
>> of
>> > files.
>> >
>> > So that was definitely bad.
>> >
>> > We considered moving to an off-the-shelf protocol definition language
>> like
>> > avro or protocol buffers. But prior experience with these is that they
>> are
>> > great for whipping together a quick service but for a stable protocol
>> it is
>> > actually better to define the protocol rather than specifying an
>> > implementation like avro or protocol buffers. This is similar to what is
>> > done with AMQP which I think does a fantastic job of providing a well
>> > specified messaging protocol (that protocol is not suitable for the
>> type of
>> > system we are building, but their method of specifying it I think is
>> very
>> > good).
>> >
>> > So the conclusion was to retain our BNF-specified protocol and instead
>> > implement a simple library for implementing this protocol. This would
>> have
>> > the advantage of letting us retain our existing protocol and also to
>> add a
>> > few Kafka-specific optimizations. This library is just a helper utility
>> for
>> > implementing our protocol spec, the spec remains the source of truth.
>> >
>> > I implemented this as part of the new client effort. I will describe
>> how my
>> > library works and the pattern I think we should use with it.
>> >
>> > The code for defining the protocol is in
>> > org.apache.kafka.common.protocol.types. Note that this is meant to be a
>> > stand-alone library for serialization, it doesn't know anything about
>> our
>> > actual request and responses or even that the messages being defined
>> will
>> > be sent over a network. The definition of our protocol is defined in
>> > org.apache.kafka.common.protocol.Protocol, this is just the protocol
>> and is
>> > decoupled from the network layer and everything else.
>> >
>> > We define a set of types that match our protocol, namely:
>> > - fixed length primitives: int8, int16, int32, int64
>> > - variable-length primitives: string, bytes
>> > - container types: arrayof, struct
>> >
>> > You define a message using types. All types extend
>> > org.apache.kafka.common.protocol.types.Type.java. Each type knows how to
>> > read, write, validate, and estimate the size of a single java object
>> type.
>> > Here is the correspondence
>> >   Type.INT8: java.lang.Byte
>> >   Type.INT16: java.lang.Short
>> >   Type.INT32: java.lang.Integer
>> >   Type.INT32: java.lang.Long
>> >   Type.STRING: java.lang.String
>> >   Type.BYTES: java.nio.ByteBuffer
>> >   ArrayOf: Object[]
>> >   Schema: Struct
>> > The correspondence here can be thought of as that between a class and an
>> > object: the class specifies the layout of the object, the object is an
>> > instantiation of that class with particular values. Each message is
>> defined
>> > by a Schema, which can be used to read and write a Struct. The schema
>> > specifies the fields in the type, and the Struct is an "instantiation"
>> of
>> > those fields with actual values. A struct can be thought of as a special
>> > purpose hashmap.
>> >
>> > An example will make this more clear. Here is how you define the request
>> > header schema:
>> >    new Schema(new Field("api_key", INT16, "The id of the request
>> type."),
>> >               new Field("api_version", INT16, "The version of the
>> API."),
>> >               new Field("correlation_id", INT32, "documentation
>> string"),
>> >               new Field("client_id", STRING, "more documentation."));
>> >
>> > So a request header is a message that consists of a short api key
>> followed
>> > by a short api version followed by a correlation id and client id.
>> >
>> > Here is a more complex example, the producer response:
>> >
>> > new Schema(new Field("responses", new ArrayOf(new Schema(new
>> Field("topic",
>> > STRING), new Field("partition_responses", new ArrayOf(new Schema(new
>> Field(
>> > "partition", INT32), new Field("error_code", INT16), new
>> > Field("base_offset"
>> > , INT64))))))))
>> >
>> > (indentation in email is tricky). Note that this has a schema which
>> > contains an array of sub-records which in turn have a sub-array of
>> records.
>> > As this nesting gets more complicated it can get a bit hard to read, so
>> you
>> > can break it up using variables. An equivalent definition would be:
>> >
>> > Schema partitionResponse = new Schema(new Field("partition", INT32),
>> >
>> >                                       new Field("error_code", INT16),
>> >
>> >                                       new Field("base_offset", INT64));
>> >
>> > Schema topicResponse = new Schema(new Field("topic", STRING),
>> >
>> >                                   new Field("partition_responses", new
>> > ArrayOf(partitionResponse)));
>> >
>> > Schema producerResposne = new Schema(new Field("responses", new
>> >  ArrayOf(topicResponse)));
>> >
>> > Note that this is exactly equivalent.
>> >
>> > Okay once such a schema is defined you can write an object in the
>> following
>> > way:
>> >
>> > Struct header = new Struct(headerSchema);
>> >
>> > header.set("api_key", (short) 1);
>> >
>> > header.set("api_version", (short), 0);
>> >
>> > ...
>> >
>> > headerSchema.write(buffer, header);
>> >
>> > And you can read an instance of a header by doing:
>> >
>> > Struct header = headerSchema.read(buffer);
>> >
>> > Short apiKey = (Short) header.get("api_key");
>> >
>> > Field apiVersionField = header.field("api_version");
>> >
>> > Short apiKey = header.get(apiVersionField);
>> >
>> > Note the two different field access styles. Accessing a field by name
>> has
>> > the performance of a hash table lookup. However for performance critical
>> > situations you can get the Field object that represents that entry in
>> the
>> > struct. Getting this field object takes a hash table lookup but once you
>> > have it it will get that field out of any instance of that struct with
>> the
>> > performance of an array access. So this is useful in cases where you can
>> > statically fetch all the fields and then use them on every request (and
>> > assuming you need to optimize performance).
>> >
>> > These raw structs are logic-free and act as the "DTO" for data that
>> will be
>> > sent over the network.
>> >
>> > For the more complex requests and responses interacting with the raw
>> struct
>> > is not very pleasent. My recommendation is that we still maintain a java
>> > object that is the "domain object" for the request and knows how to read
>> > and write itself to the struct. This is what you would end up passing
>> down
>> > into KafkaApis. This will have all the convenience methods that people
>> were
>> > wanting to add to the protocol objects before. The downside of this is
>> that
>> > in some ways you define the request twice, but I think both of these
>> layers
>> > are actually needed and would evolve independently (the struct only when
>> > the protocol changes and the domain object with the needs of the code
>> that
>> > use it). I haven't actually done this in the produce yet, in part
>> because I
>> > think to make these domain objects properly you need to use them on the
>> > server side too which we aren't ready for yet. However I did add a
>> version
>> > of this for metadata on KAFKA-1238 here:
>> >
>> >
>> >
>> https://issues.apache.org/jira/secure/attachment/12627654/KAFKA-1238-v1.patch
>> >
>> > Okay, it would be great to get feedback on this code and this general
>> > approach to protocol definition. If everyone likes it then I am going to
>> > consider all the discussion items for the new code wrapped up and move
>> on
>> > to the more detailed code review and testing.
>> >
>> > -Jay
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Re: protocol definition

Posted by Jay Kreps <ja...@gmail.com>.
Hey Guozhang,

1,2. Yes, you need to define the new format and add it to the list of
versions.
3. Not necessarily. It depends on the change. If the formats are totally
different the code can take the version and just have some if statement to
read one or the other. But most changes are just the simple addition or
removal of fields. In this case the schema DSL supports default values that
will be filled in to help ease backwards compatability for simple cases.
4. For the client presumably you always use the latest version so the
existing logic new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.
id)) should suffice?

I'm not sure if I know what it is you are proposing. Care to elaborate?

-Jay


On Fri, Feb 7, 2014 at 2:29 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Just to clarify, in this case when we are evolving, say MetadataResponse to
> version 1, We will:
>
> 1. In Protocol.java, add another variable METADATA_RESPONSE_V1
>
> 2. In Protocol.java, change this line to:
>
> public static Schema[] METADATA_RESPONSE = new Schema[] {
> METADATA_RESPONSE_V1 };
>
> 3. In MetadataResponse.java, change the bytes-to-object function (i.e.,
> MetadataResponse(Struct struct)) to have both parsing logic for V0 and V1.
>
> 4. In MetadataResponse.java, change the object-to-bytes function (i.e.,
> Struct toStruct() ) to V1.
>
> Is this correct?
>
> My feeling is that it would be good if we do not have to get the field
> names like "topics" in multiple places: Protocol and MetadataResponse, for
> example, and although the toStruct() only needs to maintain the logic for
> the latest version, MetadataResponse(Struct struct) would probably needs to
> handle all versions, which will make it very complicated. So I am wondering
> can these two functions be provided automatically? I am not advocating for
> another round of compilation but would like to see if it is possible for
> these procedures be done in a more programmable way.
>
> Guozhang
>
>
>
> On Fri, Feb 7, 2014 at 12:56 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Okay this is the last discussion item for the new client code. :-)
> >
> > Previously to define an api you would implement a request and response
> > scala object that read and wrote its own bytes. There were a few problems
> > with this:
> > 1. The consistency of the protocol was very hard to maintain.
> > 2. You ended up hand-coding size estimation which was very tedious and
> > error prone
> > 3. Error messages wouldn't give any field name information you would just
> > get some kind of BufferUnderflowException with no information about what
> or
> > why. Fixing these were hard because each object would have to implement
> > this good error handling.
> > 4. There wasn't good support for api versioning. We have an api version
> > that is included in the request, but there was no easy way to maintain
> both
> > the old format and the new format.
> > 5. The header information was baked into each request and it was only
> > though great care that we could keep the header standard throughout the
> > requests.
> > 6. The same class that defined the protocol was used throughout the code.
> > So what were intended to be dumb DTOs ended up getting loaded up with
> > domain logic. Invariably aspects of this representation would end up
> > leaking into the protocol.
> > 7. It was very hard to figure out what the protocol was from the code
> since
> > the definition was embedded in byte munging code spread out over dozens
> of
> > files.
> >
> > So that was definitely bad.
> >
> > We considered moving to an off-the-shelf protocol definition language
> like
> > avro or protocol buffers. But prior experience with these is that they
> are
> > great for whipping together a quick service but for a stable protocol it
> is
> > actually better to define the protocol rather than specifying an
> > implementation like avro or protocol buffers. This is similar to what is
> > done with AMQP which I think does a fantastic job of providing a well
> > specified messaging protocol (that protocol is not suitable for the type
> of
> > system we are building, but their method of specifying it I think is very
> > good).
> >
> > So the conclusion was to retain our BNF-specified protocol and instead
> > implement a simple library for implementing this protocol. This would
> have
> > the advantage of letting us retain our existing protocol and also to add
> a
> > few Kafka-specific optimizations. This library is just a helper utility
> for
> > implementing our protocol spec, the spec remains the source of truth.
> >
> > I implemented this as part of the new client effort. I will describe how
> my
> > library works and the pattern I think we should use with it.
> >
> > The code for defining the protocol is in
> > org.apache.kafka.common.protocol.types. Note that this is meant to be a
> > stand-alone library for serialization, it doesn't know anything about our
> > actual request and responses or even that the messages being defined will
> > be sent over a network. The definition of our protocol is defined in
> > org.apache.kafka.common.protocol.Protocol, this is just the protocol and
> is
> > decoupled from the network layer and everything else.
> >
> > We define a set of types that match our protocol, namely:
> > - fixed length primitives: int8, int16, int32, int64
> > - variable-length primitives: string, bytes
> > - container types: arrayof, struct
> >
> > You define a message using types. All types extend
> > org.apache.kafka.common.protocol.types.Type.java. Each type knows how to
> > read, write, validate, and estimate the size of a single java object
> type.
> > Here is the correspondence
> >   Type.INT8: java.lang.Byte
> >   Type.INT16: java.lang.Short
> >   Type.INT32: java.lang.Integer
> >   Type.INT32: java.lang.Long
> >   Type.STRING: java.lang.String
> >   Type.BYTES: java.nio.ByteBuffer
> >   ArrayOf: Object[]
> >   Schema: Struct
> > The correspondence here can be thought of as that between a class and an
> > object: the class specifies the layout of the object, the object is an
> > instantiation of that class with particular values. Each message is
> defined
> > by a Schema, which can be used to read and write a Struct. The schema
> > specifies the fields in the type, and the Struct is an "instantiation" of
> > those fields with actual values. A struct can be thought of as a special
> > purpose hashmap.
> >
> > An example will make this more clear. Here is how you define the request
> > header schema:
> >    new Schema(new Field("api_key", INT16, "The id of the request type."),
> >               new Field("api_version", INT16, "The version of the API."),
> >               new Field("correlation_id", INT32, "documentation string"),
> >               new Field("client_id", STRING, "more documentation."));
> >
> > So a request header is a message that consists of a short api key
> followed
> > by a short api version followed by a correlation id and client id.
> >
> > Here is a more complex example, the producer response:
> >
> > new Schema(new Field("responses", new ArrayOf(new Schema(new
> Field("topic",
> > STRING), new Field("partition_responses", new ArrayOf(new Schema(new
> Field(
> > "partition", INT32), new Field("error_code", INT16), new
> > Field("base_offset"
> > , INT64))))))))
> >
> > (indentation in email is tricky). Note that this has a schema which
> > contains an array of sub-records which in turn have a sub-array of
> records.
> > As this nesting gets more complicated it can get a bit hard to read, so
> you
> > can break it up using variables. An equivalent definition would be:
> >
> > Schema partitionResponse = new Schema(new Field("partition", INT32),
> >
> >                                       new Field("error_code", INT16),
> >
> >                                       new Field("base_offset", INT64));
> >
> > Schema topicResponse = new Schema(new Field("topic", STRING),
> >
> >                                   new Field("partition_responses", new
> > ArrayOf(partitionResponse)));
> >
> > Schema producerResposne = new Schema(new Field("responses", new
> >  ArrayOf(topicResponse)));
> >
> > Note that this is exactly equivalent.
> >
> > Okay once such a schema is defined you can write an object in the
> following
> > way:
> >
> > Struct header = new Struct(headerSchema);
> >
> > header.set("api_key", (short) 1);
> >
> > header.set("api_version", (short), 0);
> >
> > ...
> >
> > headerSchema.write(buffer, header);
> >
> > And you can read an instance of a header by doing:
> >
> > Struct header = headerSchema.read(buffer);
> >
> > Short apiKey = (Short) header.get("api_key");
> >
> > Field apiVersionField = header.field("api_version");
> >
> > Short apiKey = header.get(apiVersionField);
> >
> > Note the two different field access styles. Accessing a field by name has
> > the performance of a hash table lookup. However for performance critical
> > situations you can get the Field object that represents that entry in the
> > struct. Getting this field object takes a hash table lookup but once you
> > have it it will get that field out of any instance of that struct with
> the
> > performance of an array access. So this is useful in cases where you can
> > statically fetch all the fields and then use them on every request (and
> > assuming you need to optimize performance).
> >
> > These raw structs are logic-free and act as the "DTO" for data that will
> be
> > sent over the network.
> >
> > For the more complex requests and responses interacting with the raw
> struct
> > is not very pleasent. My recommendation is that we still maintain a java
> > object that is the "domain object" for the request and knows how to read
> > and write itself to the struct. This is what you would end up passing
> down
> > into KafkaApis. This will have all the convenience methods that people
> were
> > wanting to add to the protocol objects before. The downside of this is
> that
> > in some ways you define the request twice, but I think both of these
> layers
> > are actually needed and would evolve independently (the struct only when
> > the protocol changes and the domain object with the needs of the code
> that
> > use it). I haven't actually done this in the produce yet, in part
> because I
> > think to make these domain objects properly you need to use them on the
> > server side too which we aren't ready for yet. However I did add a
> version
> > of this for metadata on KAFKA-1238 here:
> >
> >
> >
> https://issues.apache.org/jira/secure/attachment/12627654/KAFKA-1238-v1.patch
> >
> > Okay, it would be great to get feedback on this code and this general
> > approach to protocol definition. If everyone likes it then I am going to
> > consider all the discussion items for the new code wrapped up and move on
> > to the more detailed code review and testing.
> >
> > -Jay
> >
>
>
>
> --
> -- Guozhang
>

Re: protocol definition

Posted by Guozhang Wang <wa...@gmail.com>.
Just to clarify, in this case when we are evolving, say MetadataResponse to
version 1, We will:

1. In Protocol.java, add another variable METADATA_RESPONSE_V1

2. In Protocol.java, change this line to:

public static Schema[] METADATA_RESPONSE = new Schema[] {
METADATA_RESPONSE_V1 };

3. In MetadataResponse.java, change the bytes-to-object function (i.e.,
MetadataResponse(Struct struct)) to have both parsing logic for V0 and V1.

4. In MetadataResponse.java, change the object-to-bytes function (i.e.,
Struct toStruct() ) to V1.

Is this correct?

My feeling is that it would be good if we do not have to get the field
names like "topics" in multiple places: Protocol and MetadataResponse, for
example, and although the toStruct() only needs to maintain the logic for
the latest version, MetadataResponse(Struct struct) would probably needs to
handle all versions, which will make it very complicated. So I am wondering
can these two functions be provided automatically? I am not advocating for
another round of compilation but would like to see if it is possible for
these procedures be done in a more programmable way.

Guozhang



On Fri, Feb 7, 2014 at 12:56 PM, Jay Kreps <ja...@gmail.com> wrote:

> Okay this is the last discussion item for the new client code. :-)
>
> Previously to define an api you would implement a request and response
> scala object that read and wrote its own bytes. There were a few problems
> with this:
> 1. The consistency of the protocol was very hard to maintain.
> 2. You ended up hand-coding size estimation which was very tedious and
> error prone
> 3. Error messages wouldn't give any field name information you would just
> get some kind of BufferUnderflowException with no information about what or
> why. Fixing these were hard because each object would have to implement
> this good error handling.
> 4. There wasn't good support for api versioning. We have an api version
> that is included in the request, but there was no easy way to maintain both
> the old format and the new format.
> 5. The header information was baked into each request and it was only
> though great care that we could keep the header standard throughout the
> requests.
> 6. The same class that defined the protocol was used throughout the code.
> So what were intended to be dumb DTOs ended up getting loaded up with
> domain logic. Invariably aspects of this representation would end up
> leaking into the protocol.
> 7. It was very hard to figure out what the protocol was from the code since
> the definition was embedded in byte munging code spread out over dozens of
> files.
>
> So that was definitely bad.
>
> We considered moving to an off-the-shelf protocol definition language like
> avro or protocol buffers. But prior experience with these is that they are
> great for whipping together a quick service but for a stable protocol it is
> actually better to define the protocol rather than specifying an
> implementation like avro or protocol buffers. This is similar to what is
> done with AMQP which I think does a fantastic job of providing a well
> specified messaging protocol (that protocol is not suitable for the type of
> system we are building, but their method of specifying it I think is very
> good).
>
> So the conclusion was to retain our BNF-specified protocol and instead
> implement a simple library for implementing this protocol. This would have
> the advantage of letting us retain our existing protocol and also to add a
> few Kafka-specific optimizations. This library is just a helper utility for
> implementing our protocol spec, the spec remains the source of truth.
>
> I implemented this as part of the new client effort. I will describe how my
> library works and the pattern I think we should use with it.
>
> The code for defining the protocol is in
> org.apache.kafka.common.protocol.types. Note that this is meant to be a
> stand-alone library for serialization, it doesn't know anything about our
> actual request and responses or even that the messages being defined will
> be sent over a network. The definition of our protocol is defined in
> org.apache.kafka.common.protocol.Protocol, this is just the protocol and is
> decoupled from the network layer and everything else.
>
> We define a set of types that match our protocol, namely:
> - fixed length primitives: int8, int16, int32, int64
> - variable-length primitives: string, bytes
> - container types: arrayof, struct
>
> You define a message using types. All types extend
> org.apache.kafka.common.protocol.types.Type.java. Each type knows how to
> read, write, validate, and estimate the size of a single java object type.
> Here is the correspondence
>   Type.INT8: java.lang.Byte
>   Type.INT16: java.lang.Short
>   Type.INT32: java.lang.Integer
>   Type.INT32: java.lang.Long
>   Type.STRING: java.lang.String
>   Type.BYTES: java.nio.ByteBuffer
>   ArrayOf: Object[]
>   Schema: Struct
> The correspondence here can be thought of as that between a class and an
> object: the class specifies the layout of the object, the object is an
> instantiation of that class with particular values. Each message is defined
> by a Schema, which can be used to read and write a Struct. The schema
> specifies the fields in the type, and the Struct is an "instantiation" of
> those fields with actual values. A struct can be thought of as a special
> purpose hashmap.
>
> An example will make this more clear. Here is how you define the request
> header schema:
>    new Schema(new Field("api_key", INT16, "The id of the request type."),
>               new Field("api_version", INT16, "The version of the API."),
>               new Field("correlation_id", INT32, "documentation string"),
>               new Field("client_id", STRING, "more documentation."));
>
> So a request header is a message that consists of a short api key followed
> by a short api version followed by a correlation id and client id.
>
> Here is a more complex example, the producer response:
>
> new Schema(new Field("responses", new ArrayOf(new Schema(new Field("topic",
> STRING), new Field("partition_responses", new ArrayOf(new Schema(new Field(
> "partition", INT32), new Field("error_code", INT16), new
> Field("base_offset"
> , INT64))))))))
>
> (indentation in email is tricky). Note that this has a schema which
> contains an array of sub-records which in turn have a sub-array of records.
> As this nesting gets more complicated it can get a bit hard to read, so you
> can break it up using variables. An equivalent definition would be:
>
> Schema partitionResponse = new Schema(new Field("partition", INT32),
>
>                                       new Field("error_code", INT16),
>
>                                       new Field("base_offset", INT64));
>
> Schema topicResponse = new Schema(new Field("topic", STRING),
>
>                                   new Field("partition_responses", new
> ArrayOf(partitionResponse)));
>
> Schema producerResposne = new Schema(new Field("responses", new
>  ArrayOf(topicResponse)));
>
> Note that this is exactly equivalent.
>
> Okay once such a schema is defined you can write an object in the following
> way:
>
> Struct header = new Struct(headerSchema);
>
> header.set("api_key", (short) 1);
>
> header.set("api_version", (short), 0);
>
> ...
>
> headerSchema.write(buffer, header);
>
> And you can read an instance of a header by doing:
>
> Struct header = headerSchema.read(buffer);
>
> Short apiKey = (Short) header.get("api_key");
>
> Field apiVersionField = header.field("api_version");
>
> Short apiKey = header.get(apiVersionField);
>
> Note the two different field access styles. Accessing a field by name has
> the performance of a hash table lookup. However for performance critical
> situations you can get the Field object that represents that entry in the
> struct. Getting this field object takes a hash table lookup but once you
> have it it will get that field out of any instance of that struct with the
> performance of an array access. So this is useful in cases where you can
> statically fetch all the fields and then use them on every request (and
> assuming you need to optimize performance).
>
> These raw structs are logic-free and act as the "DTO" for data that will be
> sent over the network.
>
> For the more complex requests and responses interacting with the raw struct
> is not very pleasent. My recommendation is that we still maintain a java
> object that is the "domain object" for the request and knows how to read
> and write itself to the struct. This is what you would end up passing down
> into KafkaApis. This will have all the convenience methods that people were
> wanting to add to the protocol objects before. The downside of this is that
> in some ways you define the request twice, but I think both of these layers
> are actually needed and would evolve independently (the struct only when
> the protocol changes and the domain object with the needs of the code that
> use it). I haven't actually done this in the produce yet, in part because I
> think to make these domain objects properly you need to use them on the
> server side too which we aren't ready for yet. However I did add a version
> of this for metadata on KAFKA-1238 here:
>
>
> https://issues.apache.org/jira/secure/attachment/12627654/KAFKA-1238-v1.patch
>
> Okay, it would be great to get feedback on this code and this general
> approach to protocol definition. If everyone likes it then I am going to
> consider all the discussion items for the new code wrapped up and move on
> to the more detailed code review and testing.
>
> -Jay
>



-- 
-- Guozhang

Re: protocol definition

Posted by David Arthur <mu...@gmail.com>.
On 2/8/14 11:40 AM, Jay Kreps wrote:
> In general I think we have tried to make the protocol consistent. The one
> exception might be some details of the message set itself. This is handled
> by custom code still in the new producer as we pack these structures ahead
> of time.
I agree that it is very consistent right now, I just recall getting 
tripped up by the message set (as did others people implementing the 
protocol).
>
> There isn't a way to ask for the latest api version. We could add an api
> for that. In general the intention is that the clients support only a
> single version of the protocol (for simplicity) and the server support as
> many versions as is practical for compatibility. So there isn't too much
> value in the client querying the version, instead it can just send
> requests, if the server can't support the version it is built with then it
> is kind of SOL anyway.
Consider the difference protocol difference between 0.8.0 and 0.8.1 - 
the only diff is the new offsets APIs. A client will want to support 
users of both versions, but offset committing in the client should be 
disabled (or ignored) if talking to 0.8.0. It's a much better client 
development/user experience if features can be proactively turned off 
rather than by fault handling. Sure users could provide this, but so 
could the client if a version API existed.
>
> Yeah I kind of pondered whether we should be lazy or not in the wrappers. I
> think it is more straight-forward to not be lazy and we will generally read
> all the fields so it should be faster to just traverse the struct read them
> all at once.
>
> Yeah the GC impact and performance in general is a concern.
>
> I agree that parsing the struct is itself kind of tricky. Overall I was not
> as happy with the outcome here as I had hoped. I think adding good helper
> methods in Struct could potentially help with that (as you suggest). As
> would making the arrays have the proper type instead of all being Object[].
>
> Definitely decomposing the logic would help make it a little more
> approachable. Definitely part of the problem is just that a few things like
> MetadataResponse and ProduceRequest are themselves actually just fairly
> complex and the mapping from domain objects is fairly complex (e.g. the
> domain objects don't have the flattening by topic/partition). This
> complexity was in the original objects but it was broken into a bunch of
> classes which kind of helped.
>
> In general for supporting large messages there are two things you can do.
> The first is be very careful with byte copying which we have tried to do
> (and do much better in the new common/client code). This will get to
> reasonable message sizes (10s of MBs) very happily. Some people have asked
> for arbitrary sizes. In this model your message value would be a stream and
> would be lazily streamed to the network, with the server lazily streaming
> it to disk. This is something very hard to support and would dramatically
> change our design from top to bottom (client, server, protocol, etc). The
> client and server would have to initiate network I/O with incomplete
> messages and the size delimiting used in the protocol would no longer be
> possible. This is such a big change and so complex to get right and so
> rarely needed for the things Kafka is good for I have been loath to really
> pursue it. I think in this case what you can usually do is store the blob
> elsewhere and just log the metadata and a reference to it.
10s of MB sounds a lot better than I thought it was.
>
> Your point about using a text bnf is a good one and would certainly be more
> readable. I guess in our approach I kind of feel the specification (i.e.
> protocol wiki) is actually the "source of truth". The java code is just our
> implementation of that spec and making it declarative is just to help keep
> it readable. That is to say if someone checked in a change to the
> Protocol.java file that broke compatibility with the spec that would be
> (philosophically) a bug not a new protocol. But your idea of generating a
> BNF from the java code just for debugging/documentation is actually really
> good.
I looked around and didn't see much, but surely there are tools for 
working with BNF or a similar format?
>
> -Jay
>
>
> On Fri, Feb 7, 2014 at 5:54 PM, David Arthur <mu...@gmail.com> wrote:
>
>> Jay,
>>
>> This looks very cool, and will certainly make writing future APIs more
>> sane and maintainable.
>>
>> Few questions/comments:
>>
>> * There's one part of the current API (I forget where) that does not use a
>> length-prefixed repeating element, how would (or would you) represent this
>> using Schema? IMO, we should be consistent and always use length prefixing
>> for variable length things.
>> * I've been meaning to ask for a while, but will there be a way to
>> interrogate Kafka for the current API version(s)? Clients will definitely
>> need this
>> * Looks like Schema#read eagerly read values, think there is any reason
>> why you'd want to do this lazily?
>> * Now that Struct is in the mix, it seems we have 2x objects. We should
>> keep an eye on the GC impact of this
>> * Using Struct, which is more of a dynamic thing, the conversion to a
>> typed response class (like MetadataResponse) looks a little hairy and
>> probably error prone. Maybe it just needs to be decomposed a bit. Maybe if
>> Struct could return a "sub-Struct" for nested/repeating elements so parsing
>> logic could be organized better.
>>
>> While we are looking at protocols, maybe we should think about what we
>> could do to make Kafka more amenable to large payloads. I'm not sure how we
>> could do that with a nice abstraction like you have, but it's worth
>> thinking about.
>>
>> As for storing the protocol definitions, it would be really sweet if we
>> could keep them in a text form (something like BNF, if not just BNF
>> itself). This would allow other languages to write parsers for it and
>> automatically generate protocols. If not using a text form as the source of
>> truth, maybe Schema could simply generate a BNF text file.
>>
>> Again, this is really cool and could definitely be it's own standalone
>> library :)
>>
>> -David
>>
>>
>>
>> On 2/7/14 3:56 PM, Jay Kreps wrote:
>>
>>> Okay this is the last discussion item for the new client code. :-)
>>>
>>> Previously to define an api you would implement a request and response
>>> scala object that read and wrote its own bytes. There were a few problems
>>> with this:
>>> 1. The consistency of the protocol was very hard to maintain.
>>> 2. You ended up hand-coding size estimation which was very tedious and
>>> error prone
>>> 3. Error messages wouldn't give any field name information you would just
>>> get some kind of BufferUnderflowException with no information about what
>>> or
>>> why. Fixing these were hard because each object would have to implement
>>> this good error handling.
>>> 4. There wasn't good support for api versioning. We have an api version
>>> that is included in the request, but there was no easy way to maintain
>>> both
>>> the old format and the new format.
>>> 5. The header information was baked into each request and it was only
>>> though great care that we could keep the header standard throughout the
>>> requests.
>>> 6. The same class that defined the protocol was used throughout the code.
>>> So what were intended to be dumb DTOs ended up getting loaded up with
>>> domain logic. Invariably aspects of this representation would end up
>>> leaking into the protocol.
>>> 7. It was very hard to figure out what the protocol was from the code
>>> since
>>> the definition was embedded in byte munging code spread out over dozens of
>>> files.
>>>
>>> So that was definitely bad.
>>>
>>> We considered moving to an off-the-shelf protocol definition language like
>>> avro or protocol buffers. But prior experience with these is that they are
>>> great for whipping together a quick service but for a stable protocol it
>>> is
>>> actually better to define the protocol rather than specifying an
>>> implementation like avro or protocol buffers. This is similar to what is
>>> done with AMQP which I think does a fantastic job of providing a well
>>> specified messaging protocol (that protocol is not suitable for the type
>>> of
>>> system we are building, but their method of specifying it I think is very
>>> good).
>>>
>>> So the conclusion was to retain our BNF-specified protocol and instead
>>> implement a simple library for implementing this protocol. This would have
>>> the advantage of letting us retain our existing protocol and also to add a
>>> few Kafka-specific optimizations. This library is just a helper utility
>>> for
>>> implementing our protocol spec, the spec remains the source of truth.
>>>
>>> I implemented this as part of the new client effort. I will describe how
>>> my
>>> library works and the pattern I think we should use with it.
>>>
>>> The code for defining the protocol is in
>>> org.apache.kafka.common.protocol.types. Note that this is meant to be a
>>> stand-alone library for serialization, it doesn't know anything about our
>>> actual request and responses or even that the messages being defined will
>>> be sent over a network. The definition of our protocol is defined in
>>> org.apache.kafka.common.protocol.Protocol, this is just the protocol and
>>> is
>>> decoupled from the network layer and everything else.
>>>
>>> We define a set of types that match our protocol, namely:
>>> - fixed length primitives: int8, int16, int32, int64
>>> - variable-length primitives: string, bytes
>>> - container types: arrayof, struct
>>>
>>> You define a message using types. All types extend
>>> org.apache.kafka.common.protocol.types.Type.java. Each type knows how to
>>> read, write, validate, and estimate the size of a single java object type.
>>> Here is the correspondence
>>>     Type.INT8: java.lang.Byte
>>>     Type.INT16: java.lang.Short
>>>     Type.INT32: java.lang.Integer
>>>     Type.INT32: java.lang.Long
>>>     Type.STRING: java.lang.String
>>>     Type.BYTES: java.nio.ByteBuffer
>>>     ArrayOf: Object[]
>>>     Schema: Struct
>>> The correspondence here can be thought of as that between a class and an
>>> object: the class specifies the layout of the object, the object is an
>>> instantiation of that class with particular values. Each message is
>>> defined
>>> by a Schema, which can be used to read and write a Struct. The schema
>>> specifies the fields in the type, and the Struct is an "instantiation" of
>>> those fields with actual values. A struct can be thought of as a special
>>> purpose hashmap.
>>>
>>> An example will make this more clear. Here is how you define the request
>>> header schema:
>>>      new Schema(new Field("api_key", INT16, "The id of the request type."),
>>>                 new Field("api_version", INT16, "The version of the API."),
>>>                 new Field("correlation_id", INT32, "documentation string"),
>>>                 new Field("client_id", STRING, "more documentation."));
>>>
>>> So a request header is a message that consists of a short api key followed
>>> by a short api version followed by a correlation id and client id.
>>>
>>> Here is a more complex example, the producer response:
>>>
>>> new Schema(new Field("responses", new ArrayOf(new Schema(new
>>> Field("topic",
>>> STRING), new Field("partition_responses", new ArrayOf(new Schema(new
>>> Field(
>>> "partition", INT32), new Field("error_code", INT16), new
>>> Field("base_offset"
>>> , INT64))))))))
>>>
>>> (indentation in email is tricky). Note that this has a schema which
>>> contains an array of sub-records which in turn have a sub-array of
>>> records.
>>> As this nesting gets more complicated it can get a bit hard to read, so
>>> you
>>> can break it up using variables. An equivalent definition would be:
>>>
>>> Schema partitionResponse = new Schema(new Field("partition", INT32),
>>>
>>>                                         new Field("error_code", INT16),
>>>
>>>                                         new Field("base_offset", INT64));
>>>
>>> Schema topicResponse = new Schema(new Field("topic", STRING),
>>>
>>>                                     new Field("partition_responses", new
>>> ArrayOf(partitionResponse)));
>>>
>>> Schema producerResposne = new Schema(new Field("responses", new
>>>    ArrayOf(topicResponse)));
>>>
>>> Note that this is exactly equivalent.
>>>
>>> Okay once such a schema is defined you can write an object in the
>>> following
>>> way:
>>>
>>> Struct header = new Struct(headerSchema);
>>>
>>> header.set("api_key", (short) 1);
>>>
>>> header.set("api_version", (short), 0);
>>>
>>> ...
>>>
>>> headerSchema.write(buffer, header);
>>>
>>> And you can read an instance of a header by doing:
>>>
>>> Struct header = headerSchema.read(buffer);
>>>
>>> Short apiKey = (Short) header.get("api_key");
>>>
>>> Field apiVersionField = header.field("api_version");
>>>
>>> Short apiKey = header.get(apiVersionField);
>>>
>>> Note the two different field access styles. Accessing a field by name has
>>> the performance of a hash table lookup. However for performance critical
>>> situations you can get the Field object that represents that entry in the
>>> struct. Getting this field object takes a hash table lookup but once you
>>> have it it will get that field out of any instance of that struct with the
>>> performance of an array access. So this is useful in cases where you can
>>> statically fetch all the fields and then use them on every request (and
>>> assuming you need to optimize performance).
>>>
>>> These raw structs are logic-free and act as the "DTO" for data that will
>>> be
>>> sent over the network.
>>>
>>> For the more complex requests and responses interacting with the raw
>>> struct
>>> is not very pleasent. My recommendation is that we still maintain a java
>>> object that is the "domain object" for the request and knows how to read
>>> and write itself to the struct. This is what you would end up passing down
>>> into KafkaApis. This will have all the convenience methods that people
>>> were
>>> wanting to add to the protocol objects before. The downside of this is
>>> that
>>> in some ways you define the request twice, but I think both of these
>>> layers
>>> are actually needed and would evolve independently (the struct only when
>>> the protocol changes and the domain object with the needs of the code that
>>> use it). I haven't actually done this in the produce yet, in part because
>>> I
>>> think to make these domain objects properly you need to use them on the
>>> server side too which we aren't ready for yet. However I did add a version
>>> of this for metadata on KAFKA-1238 here:
>>>
>>> https://issues.apache.org/jira/secure/attachment/
>>> 12627654/KAFKA-1238-v1.patch
>>>
>>> Okay, it would be great to get feedback on this code and this general
>>> approach to protocol definition. If everyone likes it then I am going to
>>> consider all the discussion items for the new code wrapped up and move on
>>> to the more detailed code review and testing.
>>>
>>> -Jay
>>>
>>>


Re: protocol definition

Posted by Jay Kreps <ja...@gmail.com>.
In general I think we have tried to make the protocol consistent. The one
exception might be some details of the message set itself. This is handled
by custom code still in the new producer as we pack these structures ahead
of time.

There isn't a way to ask for the latest api version. We could add an api
for that. In general the intention is that the clients support only a
single version of the protocol (for simplicity) and the server support as
many versions as is practical for compatibility. So there isn't too much
value in the client querying the version, instead it can just send
requests, if the server can't support the version it is built with then it
is kind of SOL anyway.

Yeah I kind of pondered whether we should be lazy or not in the wrappers. I
think it is more straight-forward to not be lazy and we will generally read
all the fields so it should be faster to just traverse the struct read them
all at once.

Yeah the GC impact and performance in general is a concern.

I agree that parsing the struct is itself kind of tricky. Overall I was not
as happy with the outcome here as I had hoped. I think adding good helper
methods in Struct could potentially help with that (as you suggest). As
would making the arrays have the proper type instead of all being Object[].

Definitely decomposing the logic would help make it a little more
approachable. Definitely part of the problem is just that a few things like
MetadataResponse and ProduceRequest are themselves actually just fairly
complex and the mapping from domain objects is fairly complex (e.g. the
domain objects don't have the flattening by topic/partition). This
complexity was in the original objects but it was broken into a bunch of
classes which kind of helped.

In general for supporting large messages there are two things you can do.
The first is be very careful with byte copying which we have tried to do
(and do much better in the new common/client code). This will get to
reasonable message sizes (10s of MBs) very happily. Some people have asked
for arbitrary sizes. In this model your message value would be a stream and
would be lazily streamed to the network, with the server lazily streaming
it to disk. This is something very hard to support and would dramatically
change our design from top to bottom (client, server, protocol, etc). The
client and server would have to initiate network I/O with incomplete
messages and the size delimiting used in the protocol would no longer be
possible. This is such a big change and so complex to get right and so
rarely needed for the things Kafka is good for I have been loath to really
pursue it. I think in this case what you can usually do is store the blob
elsewhere and just log the metadata and a reference to it.

Your point about using a text bnf is a good one and would certainly be more
readable. I guess in our approach I kind of feel the specification (i.e.
protocol wiki) is actually the "source of truth". The java code is just our
implementation of that spec and making it declarative is just to help keep
it readable. That is to say if someone checked in a change to the
Protocol.java file that broke compatibility with the spec that would be
(philosophically) a bug not a new protocol. But your idea of generating a
BNF from the java code just for debugging/documentation is actually really
good.

-Jay


On Fri, Feb 7, 2014 at 5:54 PM, David Arthur <mu...@gmail.com> wrote:

> Jay,
>
> This looks very cool, and will certainly make writing future APIs more
> sane and maintainable.
>
> Few questions/comments:
>
> * There's one part of the current API (I forget where) that does not use a
> length-prefixed repeating element, how would (or would you) represent this
> using Schema? IMO, we should be consistent and always use length prefixing
> for variable length things.
> * I've been meaning to ask for a while, but will there be a way to
> interrogate Kafka for the current API version(s)? Clients will definitely
> need this
> * Looks like Schema#read eagerly read values, think there is any reason
> why you'd want to do this lazily?
> * Now that Struct is in the mix, it seems we have 2x objects. We should
> keep an eye on the GC impact of this
> * Using Struct, which is more of a dynamic thing, the conversion to a
> typed response class (like MetadataResponse) looks a little hairy and
> probably error prone. Maybe it just needs to be decomposed a bit. Maybe if
> Struct could return a "sub-Struct" for nested/repeating elements so parsing
> logic could be organized better.
>
> While we are looking at protocols, maybe we should think about what we
> could do to make Kafka more amenable to large payloads. I'm not sure how we
> could do that with a nice abstraction like you have, but it's worth
> thinking about.
>
> As for storing the protocol definitions, it would be really sweet if we
> could keep them in a text form (something like BNF, if not just BNF
> itself). This would allow other languages to write parsers for it and
> automatically generate protocols. If not using a text form as the source of
> truth, maybe Schema could simply generate a BNF text file.
>
> Again, this is really cool and could definitely be it's own standalone
> library :)
>
> -David
>
>
>
> On 2/7/14 3:56 PM, Jay Kreps wrote:
>
>> Okay this is the last discussion item for the new client code. :-)
>>
>> Previously to define an api you would implement a request and response
>> scala object that read and wrote its own bytes. There were a few problems
>> with this:
>> 1. The consistency of the protocol was very hard to maintain.
>> 2. You ended up hand-coding size estimation which was very tedious and
>> error prone
>> 3. Error messages wouldn't give any field name information you would just
>> get some kind of BufferUnderflowException with no information about what
>> or
>> why. Fixing these were hard because each object would have to implement
>> this good error handling.
>> 4. There wasn't good support for api versioning. We have an api version
>> that is included in the request, but there was no easy way to maintain
>> both
>> the old format and the new format.
>> 5. The header information was baked into each request and it was only
>> though great care that we could keep the header standard throughout the
>> requests.
>> 6. The same class that defined the protocol was used throughout the code.
>> So what were intended to be dumb DTOs ended up getting loaded up with
>> domain logic. Invariably aspects of this representation would end up
>> leaking into the protocol.
>> 7. It was very hard to figure out what the protocol was from the code
>> since
>> the definition was embedded in byte munging code spread out over dozens of
>> files.
>>
>> So that was definitely bad.
>>
>> We considered moving to an off-the-shelf protocol definition language like
>> avro or protocol buffers. But prior experience with these is that they are
>> great for whipping together a quick service but for a stable protocol it
>> is
>> actually better to define the protocol rather than specifying an
>> implementation like avro or protocol buffers. This is similar to what is
>> done with AMQP which I think does a fantastic job of providing a well
>> specified messaging protocol (that protocol is not suitable for the type
>> of
>> system we are building, but their method of specifying it I think is very
>> good).
>>
>> So the conclusion was to retain our BNF-specified protocol and instead
>> implement a simple library for implementing this protocol. This would have
>> the advantage of letting us retain our existing protocol and also to add a
>> few Kafka-specific optimizations. This library is just a helper utility
>> for
>> implementing our protocol spec, the spec remains the source of truth.
>>
>> I implemented this as part of the new client effort. I will describe how
>> my
>> library works and the pattern I think we should use with it.
>>
>> The code for defining the protocol is in
>> org.apache.kafka.common.protocol.types. Note that this is meant to be a
>> stand-alone library for serialization, it doesn't know anything about our
>> actual request and responses or even that the messages being defined will
>> be sent over a network. The definition of our protocol is defined in
>> org.apache.kafka.common.protocol.Protocol, this is just the protocol and
>> is
>> decoupled from the network layer and everything else.
>>
>> We define a set of types that match our protocol, namely:
>> - fixed length primitives: int8, int16, int32, int64
>> - variable-length primitives: string, bytes
>> - container types: arrayof, struct
>>
>> You define a message using types. All types extend
>> org.apache.kafka.common.protocol.types.Type.java. Each type knows how to
>> read, write, validate, and estimate the size of a single java object type.
>> Here is the correspondence
>>    Type.INT8: java.lang.Byte
>>    Type.INT16: java.lang.Short
>>    Type.INT32: java.lang.Integer
>>    Type.INT32: java.lang.Long
>>    Type.STRING: java.lang.String
>>    Type.BYTES: java.nio.ByteBuffer
>>    ArrayOf: Object[]
>>    Schema: Struct
>> The correspondence here can be thought of as that between a class and an
>> object: the class specifies the layout of the object, the object is an
>> instantiation of that class with particular values. Each message is
>> defined
>> by a Schema, which can be used to read and write a Struct. The schema
>> specifies the fields in the type, and the Struct is an "instantiation" of
>> those fields with actual values. A struct can be thought of as a special
>> purpose hashmap.
>>
>> An example will make this more clear. Here is how you define the request
>> header schema:
>>     new Schema(new Field("api_key", INT16, "The id of the request type."),
>>                new Field("api_version", INT16, "The version of the API."),
>>                new Field("correlation_id", INT32, "documentation string"),
>>                new Field("client_id", STRING, "more documentation."));
>>
>> So a request header is a message that consists of a short api key followed
>> by a short api version followed by a correlation id and client id.
>>
>> Here is a more complex example, the producer response:
>>
>> new Schema(new Field("responses", new ArrayOf(new Schema(new
>> Field("topic",
>> STRING), new Field("partition_responses", new ArrayOf(new Schema(new
>> Field(
>> "partition", INT32), new Field("error_code", INT16), new
>> Field("base_offset"
>> , INT64))))))))
>>
>> (indentation in email is tricky). Note that this has a schema which
>> contains an array of sub-records which in turn have a sub-array of
>> records.
>> As this nesting gets more complicated it can get a bit hard to read, so
>> you
>> can break it up using variables. An equivalent definition would be:
>>
>> Schema partitionResponse = new Schema(new Field("partition", INT32),
>>
>>                                        new Field("error_code", INT16),
>>
>>                                        new Field("base_offset", INT64));
>>
>> Schema topicResponse = new Schema(new Field("topic", STRING),
>>
>>                                    new Field("partition_responses", new
>> ArrayOf(partitionResponse)));
>>
>> Schema producerResposne = new Schema(new Field("responses", new
>>   ArrayOf(topicResponse)));
>>
>> Note that this is exactly equivalent.
>>
>> Okay once such a schema is defined you can write an object in the
>> following
>> way:
>>
>> Struct header = new Struct(headerSchema);
>>
>> header.set("api_key", (short) 1);
>>
>> header.set("api_version", (short), 0);
>>
>> ...
>>
>> headerSchema.write(buffer, header);
>>
>> And you can read an instance of a header by doing:
>>
>> Struct header = headerSchema.read(buffer);
>>
>> Short apiKey = (Short) header.get("api_key");
>>
>> Field apiVersionField = header.field("api_version");
>>
>> Short apiKey = header.get(apiVersionField);
>>
>> Note the two different field access styles. Accessing a field by name has
>> the performance of a hash table lookup. However for performance critical
>> situations you can get the Field object that represents that entry in the
>> struct. Getting this field object takes a hash table lookup but once you
>> have it it will get that field out of any instance of that struct with the
>> performance of an array access. So this is useful in cases where you can
>> statically fetch all the fields and then use them on every request (and
>> assuming you need to optimize performance).
>>
>> These raw structs are logic-free and act as the "DTO" for data that will
>> be
>> sent over the network.
>>
>> For the more complex requests and responses interacting with the raw
>> struct
>> is not very pleasent. My recommendation is that we still maintain a java
>> object that is the "domain object" for the request and knows how to read
>> and write itself to the struct. This is what you would end up passing down
>> into KafkaApis. This will have all the convenience methods that people
>> were
>> wanting to add to the protocol objects before. The downside of this is
>> that
>> in some ways you define the request twice, but I think both of these
>> layers
>> are actually needed and would evolve independently (the struct only when
>> the protocol changes and the domain object with the needs of the code that
>> use it). I haven't actually done this in the produce yet, in part because
>> I
>> think to make these domain objects properly you need to use them on the
>> server side too which we aren't ready for yet. However I did add a version
>> of this for metadata on KAFKA-1238 here:
>>
>> https://issues.apache.org/jira/secure/attachment/
>> 12627654/KAFKA-1238-v1.patch
>>
>> Okay, it would be great to get feedback on this code and this general
>> approach to protocol definition. If everyone likes it then I am going to
>> consider all the discussion items for the new code wrapped up and move on
>> to the more detailed code review and testing.
>>
>> -Jay
>>
>>
>

Re: protocol definition

Posted by David Arthur <mu...@gmail.com>.
Jay,

This looks very cool, and will certainly make writing future APIs more 
sane and maintainable.

Few questions/comments:

* There's one part of the current API (I forget where) that does not use 
a length-prefixed repeating element, how would (or would you) represent 
this using Schema? IMO, we should be consistent and always use length 
prefixing for variable length things.
* I've been meaning to ask for a while, but will there be a way to 
interrogate Kafka for the current API version(s)? Clients will 
definitely need this
* Looks like Schema#read eagerly read values, think there is any reason 
why you'd want to do this lazily?
* Now that Struct is in the mix, it seems we have 2x objects. We should 
keep an eye on the GC impact of this
* Using Struct, which is more of a dynamic thing, the conversion to a 
typed response class (like MetadataResponse) looks a little hairy and 
probably error prone. Maybe it just needs to be decomposed a bit. Maybe 
if Struct could return a "sub-Struct" for nested/repeating elements so 
parsing logic could be organized better.

While we are looking at protocols, maybe we should think about what we 
could do to make Kafka more amenable to large payloads. I'm not sure how 
we could do that with a nice abstraction like you have, but it's worth 
thinking about.

As for storing the protocol definitions, it would be really sweet if we 
could keep them in a text form (something like BNF, if not just BNF 
itself). This would allow other languages to write parsers for it and 
automatically generate protocols. If not using a text form as the source 
of truth, maybe Schema could simply generate a BNF text file.

Again, this is really cool and could definitely be it's own standalone 
library :)

-David


On 2/7/14 3:56 PM, Jay Kreps wrote:
> Okay this is the last discussion item for the new client code. :-)
>
> Previously to define an api you would implement a request and response
> scala object that read and wrote its own bytes. There were a few problems
> with this:
> 1. The consistency of the protocol was very hard to maintain.
> 2. You ended up hand-coding size estimation which was very tedious and
> error prone
> 3. Error messages wouldn't give any field name information you would just
> get some kind of BufferUnderflowException with no information about what or
> why. Fixing these were hard because each object would have to implement
> this good error handling.
> 4. There wasn't good support for api versioning. We have an api version
> that is included in the request, but there was no easy way to maintain both
> the old format and the new format.
> 5. The header information was baked into each request and it was only
> though great care that we could keep the header standard throughout the
> requests.
> 6. The same class that defined the protocol was used throughout the code.
> So what were intended to be dumb DTOs ended up getting loaded up with
> domain logic. Invariably aspects of this representation would end up
> leaking into the protocol.
> 7. It was very hard to figure out what the protocol was from the code since
> the definition was embedded in byte munging code spread out over dozens of
> files.
>
> So that was definitely bad.
>
> We considered moving to an off-the-shelf protocol definition language like
> avro or protocol buffers. But prior experience with these is that they are
> great for whipping together a quick service but for a stable protocol it is
> actually better to define the protocol rather than specifying an
> implementation like avro or protocol buffers. This is similar to what is
> done with AMQP which I think does a fantastic job of providing a well
> specified messaging protocol (that protocol is not suitable for the type of
> system we are building, but their method of specifying it I think is very
> good).
>
> So the conclusion was to retain our BNF-specified protocol and instead
> implement a simple library for implementing this protocol. This would have
> the advantage of letting us retain our existing protocol and also to add a
> few Kafka-specific optimizations. This library is just a helper utility for
> implementing our protocol spec, the spec remains the source of truth.
>
> I implemented this as part of the new client effort. I will describe how my
> library works and the pattern I think we should use with it.
>
> The code for defining the protocol is in
> org.apache.kafka.common.protocol.types. Note that this is meant to be a
> stand-alone library for serialization, it doesn't know anything about our
> actual request and responses or even that the messages being defined will
> be sent over a network. The definition of our protocol is defined in
> org.apache.kafka.common.protocol.Protocol, this is just the protocol and is
> decoupled from the network layer and everything else.
>
> We define a set of types that match our protocol, namely:
> - fixed length primitives: int8, int16, int32, int64
> - variable-length primitives: string, bytes
> - container types: arrayof, struct
>
> You define a message using types. All types extend
> org.apache.kafka.common.protocol.types.Type.java. Each type knows how to
> read, write, validate, and estimate the size of a single java object type.
> Here is the correspondence
>    Type.INT8: java.lang.Byte
>    Type.INT16: java.lang.Short
>    Type.INT32: java.lang.Integer
>    Type.INT32: java.lang.Long
>    Type.STRING: java.lang.String
>    Type.BYTES: java.nio.ByteBuffer
>    ArrayOf: Object[]
>    Schema: Struct
> The correspondence here can be thought of as that between a class and an
> object: the class specifies the layout of the object, the object is an
> instantiation of that class with particular values. Each message is defined
> by a Schema, which can be used to read and write a Struct. The schema
> specifies the fields in the type, and the Struct is an "instantiation" of
> those fields with actual values. A struct can be thought of as a special
> purpose hashmap.
>
> An example will make this more clear. Here is how you define the request
> header schema:
>     new Schema(new Field("api_key", INT16, "The id of the request type."),
>                new Field("api_version", INT16, "The version of the API."),
>                new Field("correlation_id", INT32, "documentation string"),
>                new Field("client_id", STRING, "more documentation."));
>
> So a request header is a message that consists of a short api key followed
> by a short api version followed by a correlation id and client id.
>
> Here is a more complex example, the producer response:
>
> new Schema(new Field("responses", new ArrayOf(new Schema(new Field("topic",
> STRING), new Field("partition_responses", new ArrayOf(new Schema(new Field(
> "partition", INT32), new Field("error_code", INT16), new Field("base_offset"
> , INT64))))))))
>
> (indentation in email is tricky). Note that this has a schema which
> contains an array of sub-records which in turn have a sub-array of records.
> As this nesting gets more complicated it can get a bit hard to read, so you
> can break it up using variables. An equivalent definition would be:
>
> Schema partitionResponse = new Schema(new Field("partition", INT32),
>
>                                        new Field("error_code", INT16),
>
>                                        new Field("base_offset", INT64));
>
> Schema topicResponse = new Schema(new Field("topic", STRING),
>
>                                    new Field("partition_responses", new
> ArrayOf(partitionResponse)));
>
> Schema producerResposne = new Schema(new Field("responses", new
>   ArrayOf(topicResponse)));
>
> Note that this is exactly equivalent.
>
> Okay once such a schema is defined you can write an object in the following
> way:
>
> Struct header = new Struct(headerSchema);
>
> header.set("api_key", (short) 1);
>
> header.set("api_version", (short), 0);
>
> ...
>
> headerSchema.write(buffer, header);
>
> And you can read an instance of a header by doing:
>
> Struct header = headerSchema.read(buffer);
>
> Short apiKey = (Short) header.get("api_key");
>
> Field apiVersionField = header.field("api_version");
>
> Short apiKey = header.get(apiVersionField);
>
> Note the two different field access styles. Accessing a field by name has
> the performance of a hash table lookup. However for performance critical
> situations you can get the Field object that represents that entry in the
> struct. Getting this field object takes a hash table lookup but once you
> have it it will get that field out of any instance of that struct with the
> performance of an array access. So this is useful in cases where you can
> statically fetch all the fields and then use them on every request (and
> assuming you need to optimize performance).
>
> These raw structs are logic-free and act as the "DTO" for data that will be
> sent over the network.
>
> For the more complex requests and responses interacting with the raw struct
> is not very pleasent. My recommendation is that we still maintain a java
> object that is the "domain object" for the request and knows how to read
> and write itself to the struct. This is what you would end up passing down
> into KafkaApis. This will have all the convenience methods that people were
> wanting to add to the protocol objects before. The downside of this is that
> in some ways you define the request twice, but I think both of these layers
> are actually needed and would evolve independently (the struct only when
> the protocol changes and the domain object with the needs of the code that
> use it). I haven't actually done this in the produce yet, in part because I
> think to make these domain objects properly you need to use them on the
> server side too which we aren't ready for yet. However I did add a version
> of this for metadata on KAFKA-1238 here:
>
> https://issues.apache.org/jira/secure/attachment/12627654/KAFKA-1238-v1.patch
>
> Okay, it would be great to get feedback on this code and this general
> approach to protocol definition. If everyone likes it then I am going to
> consider all the discussion items for the new code wrapped up and move on
> to the more detailed code review and testing.
>
> -Jay
>