You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Joel Koshy <jj...@gmail.com> on 2012/03/28 03:13:50 UTC
Consumer API extension in KAFKA-249
The newest patch for KAFKA-249 adds a new method to the consumer connector
(after feedback on the first patch). Since this is an addition to the
consumer
API, I wanted to send it around for comments/concerns.
The new method basically allows the consumer connector to subscribe to a
whitelist or blacklist of topics.
Scala API:
/**
* Create a list of message streams for all topics that match a given
filter.
*
* @param filterSpec TopicFilterSpec encapsulating a Java-style regex
whitelist
* or blacklist.
* @param numStreams number of KafkaMessageAndTopicStream to create
* or blacklist.
* @return a list of KafkaMessageAndTopicStream that provide iterator
over
* messages from allowed topics.
*/
def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec,
numStreams: Int = 1,
decoder: Decoder[T] = new
DefaultDecoder)
: Seq[KafkaMessageAndTopicStream[T]]
TopicFilterSpec can be either a Whitelist or Blacklist.
The consumer may now receive messages from multiple topics due to
wildcarding. This is why it returns a KafkaMessageAndTopicStream which
allows iteration over MessageAndTopic objects.
Example:
val stream = zkConnector.createMessageStreamsByFilter(new
Whitelist("whitetopics.*")).get(0)
for (messageAndTopic <- stream) {
println("Message from topic %s: %s".format(messageAndTopic.topic,
messageAndTopic.message))
}
The same method (and default argument variants) is also exposed in the Java
API.
Feedback/concerns/objections?
Thanks,
Joel