You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "David Arthur (JIRA)" <ji...@apache.org> on 2013/01/09 19:40:13 UTC

[jira] [Commented] (KAFKA-643) Refactor api definition layer

    [ https://issues.apache.org/jira/browse/KAFKA-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548797#comment-13548797 ] 

David Arthur commented on KAFKA-643:
------------------------------------

+1 for splitting generic/specific parts of the API (this is basically what I do in my Python client).

+1 for specifying the protocol in a ~BNF form. This would require protocols to be specified as LL grammars (which they all are), which is required for efficient ByteBuffer packing/unpacking anyways. 

However, how would this scheme handle recursive definitions (like MessageSet)? I've always felt the depth of this should be limited to one, meaning a single Message can contain a compressed MessageSet which can only be composed of regular (uncompressed) Messages. In https://github.com/mumrah/kafka-python/blob/master/kafka/client.py#L355, I have to endlessly recurse to ensure I've fully consumed the messages - kind of a pain. If the depth was limited, I could decode it non-recursively. 

+0 for not using Avro et al. I understand the performance implications of using one of these frameworks, but it sure does make client development easier. However, as long as the protocol spec is clear (and correct) implementing a client is not so bad.

What about the Java API? As far as I can tell, the purpose of these classes is to delegate to the real APIs and handle Java -> Scala data type conversion. It seems like this should be able to be automatic/automagic. Although, I guess for the implicits stuff to work the Java classes must be present.

I know it's very new (Scala 10) and experimental, but macros might help in simplifying the APIs: http://docs.scala-lang.org/overviews/macros/overview.html.
                
> Refactor api definition layer
> -----------------------------
>
>                 Key: KAFKA-643
>                 URL: https://issues.apache.org/jira/browse/KAFKA-643
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.8.1
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>
> The way we are defining our protocol is really a bit embarrassing. It is full of ad hoc serialization code for each API. This code is very fiddly and opaque and when it has errors they are hard to debug. Since it is all done one-off it is also very easy for it to become inconsistent. This was tolerable when there were only two apis with a few fields each, but now there are a half dozen more complex apis. By my count there is now over 1000 lines of code in kafka.apis.*.
> One option would be to use protocol buffers or thrift or another schema-oriented code gen RPC language. However I think this is probably the wrong direction for a couple reasons. One is that we want something that works well with our I/O model, both network and disk, which is very NIO-centric. So it should work directly with ByteBuffers. Second I feel that these systems complicate the specification of the protocol. They give a schema, which is a great high-level description, but the translation of that to bytes is essentially whatever their code-gen engine chooses to do. These things are a great way to build application services, but not great for something like what we are building.
> Instead I think we should do what we have done, specify the protocol as a wiki. However we should write a little helper code to make our lives easier.
> Here is my recommendation for how this code would work. We add two helper classes: Schema and Record.
> You define messages formats like this:
> import Types._
> val FetchRequestProtocol = 
>   Schema("ReplicaId"->int32, 
>                "MaxWaitTime"->int32, 
>                "MinBytes"->int32,
>                Seq("TopicName"->utf8,
>                       Seq("Partition"->int32, 
>                              "FetchOffset"->int64, 
>                              "MaxBytes"->int32)))
> Note that this almost exactly matches the BNF for the fetch request: 
>   https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> Once defined this schema can be used to parse messages:
>   val record: Record = FetchRequestProtocol.readFrom(buffer)
> A record is just a wrapper around an array. The readFrom method parses out the fields specified in the schema and populates the array. Fields in the record can be accessed by name, e.g. 
>   record("ReplicaId")
> For common access this is probably good enough. However since the position is fixed, it is also possible to get the element by a Field object, which gets rid of the hashmap lookup and goes directly to the right slot. E.g.
>   val ReplicaIdField = FetchRequestProtocol("ReplicaId") // do this as a global variable
>   ...
>   record(ReplicaIdField)
> This will be for cases where we are a bit performance conscious and don't want to do umpteen hashmap lookups to resolve string field names.
> Likewise the other direction, to write out a record:
>   record.writeTo(buffer)
> and to get the size in bytes:
>   record.size
> Implementing a single read, write, and size method with generic schemas will not only make the underlying protocol clearly defined but also ensure good error handling, error reporting, etc. It will be a bit slower, maybe not much because we can optimize this code.
> I do realize that this is essentially what Avro or Thrift or ProtocolBuffers do, but I think this is much simpler, and can be implemented in a few hundred lines of code with no dependencies. Furthermore it is a way to implement our protocol, not a way to define a protocol.
> In terms of how we use this, this is what I have in mind:
> I think we should split the apis into a generic and a specific portion. With the generic piece being the header shared by all requests and responses, and the specific portion being the bits for that message. I recommend we officially implement versioning by allowing multiple versions of the schemas and always looking up the right schema for the incoming and outgoing messages. I think we can keep the existing case classes, and just map the scala objects to and from the record instances in a wrapper layer prior to the existing KafkaApis. The KafkaApis.handle method would disappear and instead this wrapper would handle message deserialization and calling the right method with the right request object.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira