You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Joel Koshy <jj...@gmail.com> on 2012/03/28 03:13:50 UTC

Consumer API extension in KAFKA-249

The newest patch for KAFKA-249 adds a new method to the consumer connector
(after feedback on the first patch).  Since this is an addition to the
consumer
API, I wanted to send it around for comments/concerns.

The new method basically allows the consumer connector to subscribe to a
whitelist or blacklist of topics.

Scala API:

  /**
   *  Create a list of message streams for all topics that match a given
filter.
   *
   *  @param filterSpec TopicFilterSpec encapsulating a Java-style regex
whitelist
   *         or blacklist.
   *  @param numStreams number of KafkaMessageAndTopicStream to create
   *         or blacklist.
   *  @return a list of KafkaMessageAndTopicStream that provide iterator
over
   *           messages from allowed topics.
   */
  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec,
                                      numStreams: Int = 1,
                                      decoder: Decoder[T] = new
DefaultDecoder)
    : Seq[KafkaMessageAndTopicStream[T]]

TopicFilterSpec can be either a Whitelist or Blacklist.

The consumer may now receive messages from multiple topics due to
wildcarding. This is why it returns a KafkaMessageAndTopicStream which
allows iteration over MessageAndTopic objects.

Example:

val stream = zkConnector.createMessageStreamsByFilter(new
Whitelist("whitetopics.*")).get(0)

for (messageAndTopic <- stream) {
  println("Message from topic %s: %s".format(messageAndTopic.topic,
messageAndTopic.message))
}

The same method (and default argument variants) is also exposed in the Java
API.

Feedback/concerns/objections?

Thanks,

Joel