You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Kumar Pradeep <KP...@novell.com> on 2014/06/02 10:14:50 UTC

CreateMessageStream API

Hi,
I need to create a single partition from a single topic in Kafka. I am doing this because the order of messages coming from my Kafka producer is very important to me (if I tried to have multiple partition with multiple threads doing I/O, my ordering of messages is not guaranteed; at least this is what i have read). But I do have multiple consumers consuming the same message streams (strictly in the order that the producer produced them).
Now, since, I am going to have only one thread per consumer in Kafka, I would like to think that I don't have to write the following code:

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
 
I need to create message streams without passing topic count map. I just know that I got a solitary topic and a single thread. I dont want to create a hashmap of topics and threads.
If you can suggest me the best way to deal with this, would appreciate a lot.
Thanks, Pradeep.
 

Re: CreateMessageStream API

Posted by Jun Rao <ju...@gmail.com>.
Currently, that's probably what you have to do. We are designing a consumer
with better apis (
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
).

Thanks,

Jun


On Mon, Jun 2, 2014 at 1:14 AM, Kumar Pradeep <KP...@novell.com> wrote:

> Hi,
> I need to create a single partition from a single topic in Kafka. I am
> doing this because the order of messages coming from my Kafka producer is
> very important to me (if I tried to have multiple partition with multiple
> threads doing I/O, my ordering of messages is not guaranteed; at least this
> is what i have read). But I do have multiple consumers consuming the same
> message streams (strictly in the order that the producer produced them).
> Now, since, I am going to have only one thread per consumer in Kafka, I
> would like to think that I don't have to write the following code:
>
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> topicCountMap.put(topic, new Integer(a_numThreads));
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>
> I need to create message streams without passing topic count map. I just
> know that I got a solitary topic and a single thread. I dont want to create
> a hashmap of topics and threads.
> If you can suggest me the best way to deal with this, would appreciate a
> lot.
> Thanks, Pradeep.
>
>

Re: CreateMessageStream API

Posted by Neha Narkhede <ne...@gmail.com>.
Pradeep,

Kafka guarantees ordering of data per partition. So if you have several
producers that send data with a key, you will have all messages for the
same key ordered in a single partition. If you attach a consumer to one
partition, you can then consume data in order per key. It will help to know
the details of your application to recommend a complete solution though.

As far as the consumer code is concerned, you will have to add one entry
per topic in the hashmap with the number of threads set to 1. We are trying
to improve the consumer APIs
<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html>
in Kafka 0.9. Would love to collect feedback from you.

Thanks,
Neha


On Mon, Jun 2, 2014 at 1:14 AM, Kumar Pradeep <KP...@novell.com> wrote:

> Hi,
> I need to create a single partition from a single topic in Kafka. I am
> doing this because the order of messages coming from my Kafka producer is
> very important to me (if I tried to have multiple partition with multiple
> threads doing I/O, my ordering of messages is not guaranteed; at least this
> is what i have read). But I do have multiple consumers consuming the same
> message streams (strictly in the order that the producer produced them).
> Now, since, I am going to have only one thread per consumer in Kafka, I
> would like to think that I don't have to write the following code:
>
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> topicCountMap.put(topic, new Integer(a_numThreads));
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>
> I need to create message streams without passing topic count map. I just
> know that I got a solitary topic and a single thread. I dont want to create
> a hashmap of topics and threads.
> If you can suggest me the best way to deal with this, would appreciate a
> lot.
> Thanks, Pradeep.
>
>