You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Joel Koshy <jj...@gmail.com> on 2012/03/30 21:21:26 UTC

API change in consumer

Hi all,

This is a follow-up to the email I had sent out a few days ago on the
consumer API extension as part of KAFKA-249 - after the code review, a more
major API change may be more suitable, so here is an overview.

The new method in the consumer connector that supports wildcarding (in the
v2 patch) returns a list of KafkaMessageAndTopicStream[T] objects.  There
were a couple of comments on this:

- It is (somewhat oddly) different from the existing API
  (createMessageStreams which returns a map containing KafkaMessageStream[T]
  objects)
- We already have a MessageAndOffset class, and at some point we may want to
  give consumers access to logical partition/offset information.

So this would be an opportunity to fix the consumer API to accomodate a more
general consumer stream and iterator API, that provide access to
MessageAndMetadata elements, each of which contains the message + metadata
(such as topic, offset, partition, etc.)

So I have incorporated this in a new patch (which I will upload soon after I
address all the other review comments), and I wanted to share the API
changes here since it is a more significant change that would require users
of the consumer and iterator to update their code.

--------------------------------------------------------------------------------

Proposal for the new ConsumerConnector API:

  /**
   *  Create a list of MessageStreams for each topic.
   *
   *  @param topicCountMap  a map of (topic, #streams) pair
   *  @param decoder Decoder to decode each Message to type T
   *  @return a map of (topic, list of  KafkaMessageAndMetadataStream)
pairs.
   *          The number of items in the list is #streams. Each stream
supports
   *          an iterator over message/metadata pairs.
   */
  def createMessageStreams[T](topicCountMap: Map[String,Int],
                              decoder: Decoder[T] = new DefaultDecoder)
    : Map[String,List[KafkaMessageAndMetadataStream[T]]]


  /**
   *  Create a list of message streams for all topics that match a given
filter.
   *
   *  @param filterSpec Either a Whitelist or Blacklist TopicFilterSpec
object.
   *  @param numStreams Number of streams to return
   *  @param decoder Decoder to decode each Message to type T
   *  @return a list of KafkaMessageAndMetadataStream each of which
provides an
   *          iterator over message/metadata pairs over allowed topics.
   */
  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec,
                                      numStreams: Int = 1,
                                      decoder: Decoder[T] = new
DefaultDecoder)
    : Seq[KafkaMessageAndMetadataStream[T]]

--------------------------------------------------------------------------------

The KafkaMessageAndMetadataStream[T]'s iterator is a ConsumerIterator[T]
which is an iterator over MessageAndMetadata[T] objects:

case class MessageAndMetadata[T](message: T, topic: String = "", offset:
Long = -1L)

Although the MessageAndMetadata class is simple, it also needs to be evolved
carefully - i.e., adding fields is easy, but removing fields would
effectively break older
clients at compile time).  I think it would be better to avoid schemas
and/or explicit
versioning since that would make writing the client-side code more
difficult.

--------------------------------------------------------------------------------

This means the current pattern of:

for (message <- stream) {
  // process(message)
}

will change to:

for (msgAndMetadata <- stream) {
  // processMessage(msgAndMetadata.message)
  // can also access msgAndMetadata.offset, topic, etc. if appropriate
}

--------------------------------------------------------------------------------

Would love to get any thoughts on this. Given that this is an API
change that would require code changes for consumers, I wanted to send this
around for comments/objections before proceeding further.

Thanks,

Joel

Re: API change in consumer

Posted by Joel Koshy <jj...@gmail.com>.
Thanks for the feedback.


> 1. Could you move this to the wiki so we could evolve some of the
> naming as suggestions come in?
>

Created this wiki:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+API+changes


> 2. KafkaMessageAndMetadataStream is a pretty unwieldy name. Can we
> just call it KafkaMessageStream or KafkaStream or something like that?
>

Noted on wiki.


> 3. Do we need to have two methods? Couldn't we just allow regular
> expressions in the topic name? I am not sure about the details of a
> TopicFilterSpec maybe you could explain that?
>

Right now, yes - the first returns a Map of topic -> list[stream], the
second returns a list[stream]. I had to name the
createMessageStreamsByFilter differently (from createMessageStreams)
because the compiler won't let me have two methods with default arguments.
TopicFilterSpec is described on the wiki.


> 4. I feel the current API which you retain, where you provide a map of
> topic=>thread_num is extremely unintuitive. I think we should rethink
> this pattern.
>

An alternate API would be just createMessageStreams(topic, numStreams) and
return a list[Stream]. That would make it similar to/consistent with the
createMessageStreamsByFilter API. However, zkconsumerconnector appears to
be pretty much broken in its ability to support multiple calls to
createMessageStreams/consume. KAFKA-242 is one example, but I'm sure there
are more issues than that. Until that is fixed, the only way to create
streams for multiple topics through the same connector is through the
current Map{topic -> threadnum} call, and the createMessageStreamsByFilter
call.


> 5. Does the MessageAndOffset class contain also the partition? That is
> needed, since offsets aren't unique. It might also be useful in some
> situation to have the topic there as well because if you merge the
> contents of two iterators you can process many such iterators at once
> but you won't know which topic a given message came from. We would
> probably need to rename this class to something else, not sure what.
> MessageInfo? StreamElement? We should think on this....
>

You mean the MessageAndMetadata class? Right now, no. The reason is that
partition currently makes sense only with broker ID. In 0.8 broker ID will
not be essential. So this is related to the point about metadata evolution.
Right now (for KAFKA-249) and current uses of MessageAndOffset we only need
to have topic and offset so that's why it only contains those two metadata
fields.

Thanks,

Joel


> On Fri, Mar 30, 2012 at 12:21 PM, Joel Koshy <jj...@gmail.com> wrote:
> > Hi all,
> >
> > This is a follow-up to the email I had sent out a few days ago on the
> > consumer API extension as part of KAFKA-249 - after the code review, a
> more
> > major API change may be more suitable, so here is an overview.
> >
> > The new method in the consumer connector that supports wildcarding (in
> the
> > v2 patch) returns a list of KafkaMessageAndTopicStream[T] objects.  There
> > were a couple of comments on this:
> >
> > - It is (somewhat oddly) different from the existing API
> >  (createMessageStreams which returns a map containing
> KafkaMessageStream[T]
> >  objects)
> > - We already have a MessageAndOffset class, and at some point we may
> want to
> >  give consumers access to logical partition/offset information.
> >
> > So this would be an opportunity to fix the consumer API to accomodate a
> more
> > general consumer stream and iterator API, that provide access to
> > MessageAndMetadata elements, each of which contains the message +
> metadata
> > (such as topic, offset, partition, etc.)
> >
> > So I have incorporated this in a new patch (which I will upload soon
> after I
> > address all the other review comments), and I wanted to share the API
> > changes here since it is a more significant change that would require
> users
> > of the consumer and iterator to update their code.
> >
> >
> --------------------------------------------------------------------------------
> >
> > Proposal for the new ConsumerConnector API:
> >
> >  /**
> >   *  Create a list of MessageStreams for each topic.
> >   *
> >   *  @param topicCountMap  a map of (topic, #streams) pair
> >   *  @param decoder Decoder to decode each Message to type T
> >   *  @return a map of (topic, list of  KafkaMessageAndMetadataStream)
> > pairs.
> >   *          The number of items in the list is #streams. Each stream
> > supports
> >   *          an iterator over message/metadata pairs.
> >   */
> >  def createMessageStreams[T](topicCountMap: Map[String,Int],
> >                              decoder: Decoder[T] = new DefaultDecoder)
> >    : Map[String,List[KafkaMessageAndMetadataStream[T]]]
> >
> >
> >  /**
> >   *  Create a list of message streams for all topics that match a given
> > filter.
> >   *
> >   *  @param filterSpec Either a Whitelist or Blacklist TopicFilterSpec
> > object.
> >   *  @param numStreams Number of streams to return
> >   *  @param decoder Decoder to decode each Message to type T
> >   *  @return a list of KafkaMessageAndMetadataStream each of which
> > provides an
> >   *          iterator over message/metadata pairs over allowed topics.
> >   */
> >  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec,
> >                                      numStreams: Int = 1,
> >                                      decoder: Decoder[T] = new
> > DefaultDecoder)
> >    : Seq[KafkaMessageAndMetadataStream[T]]
> >
> >
> --------------------------------------------------------------------------------
> >
> > The KafkaMessageAndMetadataStream[T]'s iterator is a ConsumerIterator[T]
> > which is an iterator over MessageAndMetadata[T] objects:
> >
> > case class MessageAndMetadata[T](message: T, topic: String = "", offset:
> > Long = -1L)
> >
> > Although the MessageAndMetadata class is simple, it also needs to be
> evolved
> > carefully - i.e., adding fields is easy, but removing fields would
> > effectively break older
> > clients at compile time).  I think it would be better to avoid schemas
> > and/or explicit
> > versioning since that would make writing the client-side code more
> > difficult.
> >
> >
> --------------------------------------------------------------------------------
> >
> > This means the current pattern of:
> >
> > for (message <- stream) {
> >  // process(message)
> > }
> >
> > will change to:
> >
> > for (msgAndMetadata <- stream) {
> >  // processMessage(msgAndMetadata.message)
> >  // can also access msgAndMetadata.offset, topic, etc. if appropriate
> > }
> >
> >
> --------------------------------------------------------------------------------
> >
> > Would love to get any thoughts on this. Given that this is an API
> > change that would require code changes for consumers, I wanted to send
> this
> > around for comments/objections before proceeding further.
> >
> > Thanks,
> >
> > Joel
>

Re: API change in consumer

Posted by Jay Kreps <ja...@gmail.com>.
Thanks for putting detailed thought into this!

A few quick comments, and then I will think more:
1. Could you move this to the wiki so we could evolve some of the
naming as suggestions come in?
2. KafkaMessageAndMetadataStream is a pretty unwieldy name. Can we
just call it KafkaMessageStream or KafkaStream or something like that?
3. Do we need to have two methods? Couldn't we just allow regular
expressions in the topic name? I am not sure about the details of a
TopicFilterSpec maybe you could explain that?
4. I feel the current API which you retain, where you provide a map of
topic=>thread_num is extremely unintuitive. I think we should rethink
this pattern.
5. Does the MessageAndOffset class contain also the partition? That is
needed, since offsets aren't unique. It might also be useful in some
situation to have the topic there as well because if you merge the
contents of two iterators you can process many such iterators at once
but you won't know which topic a given message came from. We would
probably need to rename this class to something else, not sure what.
MessageInfo? StreamElement? We should think on this....

-Jay

On Fri, Mar 30, 2012 at 12:21 PM, Joel Koshy <jj...@gmail.com> wrote:
> Hi all,
>
> This is a follow-up to the email I had sent out a few days ago on the
> consumer API extension as part of KAFKA-249 - after the code review, a more
> major API change may be more suitable, so here is an overview.
>
> The new method in the consumer connector that supports wildcarding (in the
> v2 patch) returns a list of KafkaMessageAndTopicStream[T] objects.  There
> were a couple of comments on this:
>
> - It is (somewhat oddly) different from the existing API
>  (createMessageStreams which returns a map containing KafkaMessageStream[T]
>  objects)
> - We already have a MessageAndOffset class, and at some point we may want to
>  give consumers access to logical partition/offset information.
>
> So this would be an opportunity to fix the consumer API to accomodate a more
> general consumer stream and iterator API, that provide access to
> MessageAndMetadata elements, each of which contains the message + metadata
> (such as topic, offset, partition, etc.)
>
> So I have incorporated this in a new patch (which I will upload soon after I
> address all the other review comments), and I wanted to share the API
> changes here since it is a more significant change that would require users
> of the consumer and iterator to update their code.
>
> --------------------------------------------------------------------------------
>
> Proposal for the new ConsumerConnector API:
>
>  /**
>   *  Create a list of MessageStreams for each topic.
>   *
>   *  @param topicCountMap  a map of (topic, #streams) pair
>   *  @param decoder Decoder to decode each Message to type T
>   *  @return a map of (topic, list of  KafkaMessageAndMetadataStream)
> pairs.
>   *          The number of items in the list is #streams. Each stream
> supports
>   *          an iterator over message/metadata pairs.
>   */
>  def createMessageStreams[T](topicCountMap: Map[String,Int],
>                              decoder: Decoder[T] = new DefaultDecoder)
>    : Map[String,List[KafkaMessageAndMetadataStream[T]]]
>
>
>  /**
>   *  Create a list of message streams for all topics that match a given
> filter.
>   *
>   *  @param filterSpec Either a Whitelist or Blacklist TopicFilterSpec
> object.
>   *  @param numStreams Number of streams to return
>   *  @param decoder Decoder to decode each Message to type T
>   *  @return a list of KafkaMessageAndMetadataStream each of which
> provides an
>   *          iterator over message/metadata pairs over allowed topics.
>   */
>  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec,
>                                      numStreams: Int = 1,
>                                      decoder: Decoder[T] = new
> DefaultDecoder)
>    : Seq[KafkaMessageAndMetadataStream[T]]
>
> --------------------------------------------------------------------------------
>
> The KafkaMessageAndMetadataStream[T]'s iterator is a ConsumerIterator[T]
> which is an iterator over MessageAndMetadata[T] objects:
>
> case class MessageAndMetadata[T](message: T, topic: String = "", offset:
> Long = -1L)
>
> Although the MessageAndMetadata class is simple, it also needs to be evolved
> carefully - i.e., adding fields is easy, but removing fields would
> effectively break older
> clients at compile time).  I think it would be better to avoid schemas
> and/or explicit
> versioning since that would make writing the client-side code more
> difficult.
>
> --------------------------------------------------------------------------------
>
> This means the current pattern of:
>
> for (message <- stream) {
>  // process(message)
> }
>
> will change to:
>
> for (msgAndMetadata <- stream) {
>  // processMessage(msgAndMetadata.message)
>  // can also access msgAndMetadata.offset, topic, etc. if appropriate
> }
>
> --------------------------------------------------------------------------------
>
> Would love to get any thoughts on this. Given that this is an API
> change that would require code changes for consumers, I wanted to send this
> around for comments/objections before proceeding further.
>
> Thanks,
>
> Joel