You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Matan Amir <ma...@voxer.com> on 2012/10/07 05:27:25 UTC

Consumer for multiple topics

Hi All,

We're using the provided high level consumers (kafka.consumer.Consumer) and
i'm was wondering how common or smart it would be to have one Consumer
consume messages from multiple topics, something like:

        ImmutableMap<String, Integer> topics = ImmutableMap.of("topic1", 3,
"topic2", 2, "topic3", 5);
        Map<String, List<KafkaStream<Message>>> topicMessageStreams =
connector.createMessageStreams(topics);
        ExecutorService executor = Executors.newFixedThreadPool(3+2+5);

        for (final List<KafkaStream<Message>> topicStreams :
topicMessageStreams.values()) {
            for(final KafkaStream<Message> stream: topicStreams) {
                executor.submit(new Runnable() {
                    public void run() {
                        for(MessageAndMetadata msgAndMetadata: stream) {
                            // do stuff
                        }
                    }
                });
            }
        }

Any reason this would be a really bad idea?

Much appreciated,
Matan

Re: Consumer for multiple topics

Posted by Jay Kreps <ja...@gmail.com>.
Looks good to me. Consuming multiple topics is definitely an intended use
case.

-Jay

On Sat, Oct 6, 2012 at 8:27 PM, Matan Amir <ma...@voxer.com> wrote:

> Hi All,
>
> We're using the provided high level consumers (kafka.consumer.Consumer) and
> i'm was wondering how common or smart it would be to have one Consumer
> consume messages from multiple topics, something like:
>
>         ImmutableMap<String, Integer> topics = ImmutableMap.of("topic1", 3,
> "topic2", 2, "topic3", 5);
>         Map<String, List<KafkaStream<Message>>> topicMessageStreams =
> connector.createMessageStreams(topics);
>         ExecutorService executor = Executors.newFixedThreadPool(3+2+5);
>
>         for (final List<KafkaStream<Message>> topicStreams :
> topicMessageStreams.values()) {
>             for(final KafkaStream<Message> stream: topicStreams) {
>                 executor.submit(new Runnable() {
>                     public void run() {
>                         for(MessageAndMetadata msgAndMetadata: stream) {
>                             // do stuff
>                         }
>                     }
>                 });
>             }
>         }
>
> Any reason this would be a really bad idea?
>
> Much appreciated,
> Matan
>