You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mukesh Jha <me...@gmail.com> on 2015/01/07 15:00:09 UTC

KafkaUtils not consuming all the data from all partitions

Hi Guys,

I have a kafka topic having 90 partitions and I running
SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
kafka-receivers.

My streaming is running fine and there is no delay in processing, just that
some partitions data is never getting picked up. From the kafka console I
can see that each receiver is consuming data from 9 partitions but the lag
for some offsets keeps on increasing.

Below is my kafka-consumers parameters.

Any of you have face this kind of issue, if so then do you have any
pointers to fix it?

Map<String, String> kafkaConf = new HashMap<String, String>();
kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
kafkaConf.put("group.id", kafkaConsumerGroup);
kafkaConf.put("consumer.timeout.ms", "30000");
kafkaConf.put("auto.offset.reset", "largest");
kafkaConf.put("fetch.message.max.bytes", "20000000");
kafkaConf.put("zookeeper.session.timeout.ms", "6000");
kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
kafkaConf.put("zookeeper.sync.time.ms", "2000");
kafkaConf.put("rebalance.backoff.ms", "10000");
kafkaConf.put("rebalance.max.retries", "20");

-- 
Thanks & Regards,

*Mukesh Jha <me...@gmail.com>*

Re: KafkaUtils not consuming all the data from all partitions

Posted by Gerard Maas <ge...@gmail.com>.
AFAIK, there're two levels of parallelism related to the Spark Kafka
consumer:

At JVM level: For each receiver, one can specify the number of threads for
a given topic, provided as a map [topic -> nthreads].  This will
effectively start n JVM threads consuming partitions of that kafka topic.
At Cluster level: One can create several DStreams, and each will have one
receiver and use 1 executor core in Spark each DStream will have its
receiver as defined in the previous line.

What you need to ensure is that there's a consumer attached to each
partition of your kafka topic. That is, nthreads * nReceivers =
#kafka_partitions(topic)

e.g:
Given
nPartitions = #partitions of your topic
nThreads = #of threads per receiver

val kafkaStreams = (1 to nPartitions/nThreads).map{ i =>
KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic -> nThreads),
StorageLevel.MEMORY_ONLY_SER)

For this to work, you need at least (nPartitions/nThreads +1) cores in your
Spark cluster, although I would recommend to have 2-3x
(nPartitions/nThreads).
(and don't forget to union the streams after creation)

-kr, Gerard.



On Wed, Jan 7, 2015 at 4:43 PM, <fr...@typesafe.com> wrote:

> - You are launching up to 10 threads/topic per Receiver. Are you sure your
> receivers can support 10 threads each ? (i.e. in the default configuration,
> do they have 10 cores). If they have 2 cores, that would explain why this
> works with 20 partitions or less.
>
> - If you have 90 partitions, why start 10 Streams, each consuming 10
> partitions, and then removing the stream at index 0 ? Why not simply start
> 10 streams with 9 partitions ? Or, more simply,
>
> val kafkaStreams = (1 to numPartitions).map { _ =>
> KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic -> 1),
> StorageLevel.MEMORY_ONLY_SER)
>
> - You’re consuming up to 10 local threads *per topic*, on each of your 10
> receivers. That’s a lot of threads (10* size of kafkaTopicsList) co-located
> on a single machine. You mentioned having a single Kafka topic with 90
> partitions. Why not have a single-element topicMap ?
>
> —
> FG
>
>
> On Wed, Jan 7, 2015 at 4:05 PM, Mukesh Jha <me...@gmail.com>
> wrote:
>
>>  I understand that I've to create 10 parallel streams. My code is
>> running fine when the no of partitions is ~20, but when I increase the no
>> of partitions I keep getting in this issue.
>>
>> Below is my code to create kafka streams, along with the configs used.
>>
>>     Map<String, String> kafkaConf = new HashMap<String, String>();
>>     kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>>     kafkaConf.put("group.id", kafkaConsumerGroup);
>>     kafkaConf.put("consumer.timeout.ms", "30000");
>>     kafkaConf.put("auto.offset.reset", "largest");
>>     kafkaConf.put("fetch.message.max.bytes", "20000000");
>>     kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>>     kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>>     kafkaConf.put("zookeeper.sync.time.ms", "2000");
>>     kafkaConf.put("rebalance.backoff.ms", "10000");
>>     kafkaConf.put("rebalance.max.retries", "20");
>>     String[] topics = kafkaTopicsList;
>>     int numStreams = numKafkaThreads; // this is *10*
>>     Map<String, Integer> topicMap = new HashMap<>();
>>     for (String topic: topics) {
>>       topicMap.put(topic, numStreams);
>>     }
>>
>>     List<JavaPairDStream<byte[], byte[]>> kafkaStreams = new
>> ArrayList<>(numStreams);
>>     for (int i = 0; i < numStreams; i++) {
>>       kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
>> byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
>> topicMap, StorageLevel.MEMORY_ONLY_SER()));
>>     }
>>     JavaPairDStream<byte[], byte[]> ks = sc.union(kafkaStreams.remove(0),
>> kafkaStreams);
>>
>>
>> On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas <ge...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Could you add the code where you create the Kafka consumer?
>>>
>>> -kr, Gerard.
>>>
>>> On Wed, Jan 7, 2015 at 3:43 PM, <fr...@typesafe.com> wrote:
>>>
>>>> Hi Mukesh,
>>>>
>>>> If my understanding is correct, each Stream only has a single Receiver.
>>>> So, if you have each receiver consuming 9 partitions, you need 10 input
>>>> DStreams to create 10 concurrent receivers:
>>>>
>>>>
>>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>>>>
>>>> Would you mind sharing a bit more on how you achieve this ?
>>>>
>>>> —
>>>> FG
>>>>
>>>>
>>>> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Guys,
>>>>>
>>>>> I have a kafka topic having 90 partitions and I running
>>>>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
>>>>> kafka-receivers.
>>>>>
>>>>> My streaming is running fine and there is no delay in processing, just
>>>>> that some partitions data is never getting picked up. From the kafka
>>>>> console I can see that each receiver is consuming data from 9 partitions
>>>>> but the lag for some offsets keeps on increasing.
>>>>>
>>>>> Below is my kafka-consumers parameters.
>>>>>
>>>>> Any of you have face this kind of issue, if so then do you have any
>>>>> pointers to fix it?
>>>>>
>>>>>  Map<String, String> kafkaConf = new HashMap<String, String>();
>>>>>  kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>>>>>  kafkaConf.put("group.id", kafkaConsumerGroup);
>>>>>  kafkaConf.put("consumer.timeout.ms", "30000");
>>>>>  kafkaConf.put("auto.offset.reset", "largest");
>>>>>  kafkaConf.put("fetch.message.max.bytes", "20000000");
>>>>>  kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>>>>>  kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>>>>>  kafkaConf.put("zookeeper.sync.time.ms", "2000");
>>>>>  kafkaConf.put("rebalance.backoff.ms", "10000");
>>>>>  kafkaConf.put("rebalance.max.retries", "20");
>>>>>
>>>>> --
>>>>> Thanks & Regards,
>>>>>
>>>>> Mukesh Jha <me...@gmail.com>
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>>
>>
>> Thanks & Regards,
>>
>> Mukesh Jha <me...@gmail.com>
>>
>
>

Re: KafkaUtils not consuming all the data from all partitions

Posted by Gerard Maas <ge...@gmail.com>.
AFAIK, there're two levels of parallelism related to the Spark Kafka
consumer:

At JVM level: For each receiver, one can specify the number of threads for
a given topic, provided as a map [topic -> nthreads].  This will
effectively start n JVM threads consuming partitions of that kafka topic.
At Cluster level: One can create several DStreams, and each will have one
receiver and use 1 executor core in Spark each DStream will have its
receiver as defined in the previous line.

What you need to ensure is that there's a consumer attached to each
partition of your kafka topic. That is, nthreads * nReceivers =
#kafka_partitions(topic)

e.g:
Given
nPartitions = #partitions of your topic
nThreads = #of threads per receiver

val kafkaStreams = (1 to nPartitions/nThreads).map{ i =>
KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic -> nThreads),
StorageLevel.MEMORY_ONLY_SER)

For this to work, you need at least (nPartitions/nThreads +1) cores in your
Spark cluster, although I would recommend to have 2-3x
(nPartitions/nThreads).
(and don't forget to union the streams after creation)

-kr, Gerard.



On Wed, Jan 7, 2015 at 4:43 PM, <fr...@typesafe.com> wrote:

> - You are launching up to 10 threads/topic per Receiver. Are you sure your
> receivers can support 10 threads each ? (i.e. in the default configuration,
> do they have 10 cores). If they have 2 cores, that would explain why this
> works with 20 partitions or less.
>
> - If you have 90 partitions, why start 10 Streams, each consuming 10
> partitions, and then removing the stream at index 0 ? Why not simply start
> 10 streams with 9 partitions ? Or, more simply,
>
> val kafkaStreams = (1 to numPartitions).map { _ =>
> KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic -> 1),
> StorageLevel.MEMORY_ONLY_SER)
>
> - You’re consuming up to 10 local threads *per topic*, on each of your 10
> receivers. That’s a lot of threads (10* size of kafkaTopicsList) co-located
> on a single machine. You mentioned having a single Kafka topic with 90
> partitions. Why not have a single-element topicMap ?
>
> —
> FG
>
>
> On Wed, Jan 7, 2015 at 4:05 PM, Mukesh Jha <me...@gmail.com>
> wrote:
>
>>  I understand that I've to create 10 parallel streams. My code is
>> running fine when the no of partitions is ~20, but when I increase the no
>> of partitions I keep getting in this issue.
>>
>> Below is my code to create kafka streams, along with the configs used.
>>
>>     Map<String, String> kafkaConf = new HashMap<String, String>();
>>     kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>>     kafkaConf.put("group.id", kafkaConsumerGroup);
>>     kafkaConf.put("consumer.timeout.ms", "30000");
>>     kafkaConf.put("auto.offset.reset", "largest");
>>     kafkaConf.put("fetch.message.max.bytes", "20000000");
>>     kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>>     kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>>     kafkaConf.put("zookeeper.sync.time.ms", "2000");
>>     kafkaConf.put("rebalance.backoff.ms", "10000");
>>     kafkaConf.put("rebalance.max.retries", "20");
>>     String[] topics = kafkaTopicsList;
>>     int numStreams = numKafkaThreads; // this is *10*
>>     Map<String, Integer> topicMap = new HashMap<>();
>>     for (String topic: topics) {
>>       topicMap.put(topic, numStreams);
>>     }
>>
>>     List<JavaPairDStream<byte[], byte[]>> kafkaStreams = new
>> ArrayList<>(numStreams);
>>     for (int i = 0; i < numStreams; i++) {
>>       kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
>> byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
>> topicMap, StorageLevel.MEMORY_ONLY_SER()));
>>     }
>>     JavaPairDStream<byte[], byte[]> ks = sc.union(kafkaStreams.remove(0),
>> kafkaStreams);
>>
>>
>> On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas <ge...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Could you add the code where you create the Kafka consumer?
>>>
>>> -kr, Gerard.
>>>
>>> On Wed, Jan 7, 2015 at 3:43 PM, <fr...@typesafe.com> wrote:
>>>
>>>> Hi Mukesh,
>>>>
>>>> If my understanding is correct, each Stream only has a single Receiver.
>>>> So, if you have each receiver consuming 9 partitions, you need 10 input
>>>> DStreams to create 10 concurrent receivers:
>>>>
>>>>
>>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>>>>
>>>> Would you mind sharing a bit more on how you achieve this ?
>>>>
>>>> —
>>>> FG
>>>>
>>>>
>>>> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Guys,
>>>>>
>>>>> I have a kafka topic having 90 partitions and I running
>>>>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
>>>>> kafka-receivers.
>>>>>
>>>>> My streaming is running fine and there is no delay in processing, just
>>>>> that some partitions data is never getting picked up. From the kafka
>>>>> console I can see that each receiver is consuming data from 9 partitions
>>>>> but the lag for some offsets keeps on increasing.
>>>>>
>>>>> Below is my kafka-consumers parameters.
>>>>>
>>>>> Any of you have face this kind of issue, if so then do you have any
>>>>> pointers to fix it?
>>>>>
>>>>>  Map<String, String> kafkaConf = new HashMap<String, String>();
>>>>>  kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>>>>>  kafkaConf.put("group.id", kafkaConsumerGroup);
>>>>>  kafkaConf.put("consumer.timeout.ms", "30000");
>>>>>  kafkaConf.put("auto.offset.reset", "largest");
>>>>>  kafkaConf.put("fetch.message.max.bytes", "20000000");
>>>>>  kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>>>>>  kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>>>>>  kafkaConf.put("zookeeper.sync.time.ms", "2000");
>>>>>  kafkaConf.put("rebalance.backoff.ms", "10000");
>>>>>  kafkaConf.put("rebalance.max.retries", "20");
>>>>>
>>>>> --
>>>>> Thanks & Regards,
>>>>>
>>>>> Mukesh Jha <me...@gmail.com>
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>>
>>
>> Thanks & Regards,
>>
>> Mukesh Jha <me...@gmail.com>
>>
>
>

Re: KafkaUtils not consuming all the data from all partitions

Posted by fr...@typesafe.com.
- You are launching up to 10 threads/topic per Receiver. Are you sure your receivers can support 10 threads each ? (i.e. in the default configuration, do they have 10 cores). If they have 2 cores, that would explain why this works with 20 partitions or less.




- If you have 90 partitions, why start 10 Streams, each consuming 10 partitions, and then removing the stream at index 0 ? Why not simply start 10 streams with 9 partitions ? Or, more simply,




val kafkaStreams = (1 to numPartitions).map { _ =>
 KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic -> 1),
 StorageLevel.MEMORY_ONLY_SER)




- You’re consuming up to 10 local threads *per topic*, on each of your 10 receivers. That’s a lot of threads (10* size of kafkaTopicsList) co-located on a single machine. You mentioned having a single Kafka topic with 90 partitions. Why not have a single-element topicMap ?


—
FG

On Wed, Jan 7, 2015 at 4:05 PM, Mukesh Jha <me...@gmail.com>
wrote:

> I understand that I've to create 10 parallel streams. My code is running
> fine when the no of partitions is ~20, but when I increase the no of
> partitions I keep getting in this issue.
> Below is my code to create kafka streams, along with the configs used.
>     Map<String, String> kafkaConf = new HashMap<String, String>();
>     kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>     kafkaConf.put("group.id", kafkaConsumerGroup);
>     kafkaConf.put("consumer.timeout.ms", "30000");
>     kafkaConf.put("auto.offset.reset", "largest");
>     kafkaConf.put("fetch.message.max.bytes", "20000000");
>     kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>     kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>     kafkaConf.put("zookeeper.sync.time.ms", "2000");
>     kafkaConf.put("rebalance.backoff.ms", "10000");
>     kafkaConf.put("rebalance.max.retries", "20");
>     String[] topics = kafkaTopicsList;
>     int numStreams = numKafkaThreads; // this is *10*
>     Map<String, Integer> topicMap = new HashMap<>();
>     for (String topic: topics) {
>       topicMap.put(topic, numStreams);
>     }
>     List<JavaPairDStream<byte[], byte[]>> kafkaStreams = new
> ArrayList<>(numStreams);
>     for (int i = 0; i < numStreams; i++) {
>       kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
> byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
> topicMap, StorageLevel.MEMORY_ONLY_SER()));
>     }
>     JavaPairDStream<byte[], byte[]> ks = sc.union(kafkaStreams.remove(0),
> kafkaStreams);
> On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas <ge...@gmail.com> wrote:
>> Hi,
>>
>> Could you add the code where you create the Kafka consumer?
>>
>> -kr, Gerard.
>>
>> On Wed, Jan 7, 2015 at 3:43 PM, <fr...@typesafe.com> wrote:
>>
>>> Hi Mukesh,
>>>
>>> If my understanding is correct, each Stream only has a single Receiver.
>>> So, if you have each receiver consuming 9 partitions, you need 10 input
>>> DStreams to create 10 concurrent receivers:
>>>
>>>
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>>>
>>> Would you mind sharing a bit more on how you achieve this ?
>>>
>>> --
>>> FG
>>>
>>>
>>> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me...@gmail.com>
>>> wrote:
>>>
>>>> Hi Guys,
>>>>
>>>> I have a kafka topic having 90 partitions and I running
>>>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
>>>> kafka-receivers.
>>>>
>>>> My streaming is running fine and there is no delay in processing, just
>>>> that some partitions data is never getting picked up. From the kafka
>>>> console I can see that each receiver is consuming data from 9 partitions
>>>> but the lag for some offsets keeps on increasing.
>>>>
>>>> Below is my kafka-consumers parameters.
>>>>
>>>> Any of you have face this kind of issue, if so then do you have any
>>>> pointers to fix it?
>>>>
>>>>  Map<String, String> kafkaConf = new HashMap<String, String>();
>>>>  kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>>>>  kafkaConf.put("group.id", kafkaConsumerGroup);
>>>>  kafkaConf.put("consumer.timeout.ms", "30000");
>>>>  kafkaConf.put("auto.offset.reset", "largest");
>>>>  kafkaConf.put("fetch.message.max.bytes", "20000000");
>>>>  kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>>>>  kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>>>>  kafkaConf.put("zookeeper.sync.time.ms", "2000");
>>>>  kafkaConf.put("rebalance.backoff.ms", "10000");
>>>>  kafkaConf.put("rebalance.max.retries", "20");
>>>>
>>>> --
>>>> Thanks & Regards,
>>>>
>>>> Mukesh Jha <me...@gmail.com>
>>>>
>>>
>>>
>>
> -- 
> Thanks & Regards,
> *Mukesh Jha <me...@gmail.com>*

Re: KafkaUtils not consuming all the data from all partitions

Posted by fr...@typesafe.com.
- You are launching up to 10 threads/topic per Receiver. Are you sure your receivers can support 10 threads each ? (i.e. in the default configuration, do they have 10 cores). If they have 2 cores, that would explain why this works with 20 partitions or less.




- If you have 90 partitions, why start 10 Streams, each consuming 10 partitions, and then removing the stream at index 0 ? Why not simply start 10 streams with 9 partitions ? Or, more simply,




val kafkaStreams = (1 to numPartitions).map { _ =>
 KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic -> 1),
 StorageLevel.MEMORY_ONLY_SER)




- You’re consuming up to 10 local threads *per topic*, on each of your 10 receivers. That’s a lot of threads (10* size of kafkaTopicsList) co-located on a single machine. You mentioned having a single Kafka topic with 90 partitions. Why not have a single-element topicMap ?


—
FG

On Wed, Jan 7, 2015 at 4:05 PM, Mukesh Jha <me...@gmail.com>
wrote:

> I understand that I've to create 10 parallel streams. My code is running
> fine when the no of partitions is ~20, but when I increase the no of
> partitions I keep getting in this issue.
> Below is my code to create kafka streams, along with the configs used.
>     Map<String, String> kafkaConf = new HashMap<String, String>();
>     kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>     kafkaConf.put("group.id", kafkaConsumerGroup);
>     kafkaConf.put("consumer.timeout.ms", "30000");
>     kafkaConf.put("auto.offset.reset", "largest");
>     kafkaConf.put("fetch.message.max.bytes", "20000000");
>     kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>     kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>     kafkaConf.put("zookeeper.sync.time.ms", "2000");
>     kafkaConf.put("rebalance.backoff.ms", "10000");
>     kafkaConf.put("rebalance.max.retries", "20");
>     String[] topics = kafkaTopicsList;
>     int numStreams = numKafkaThreads; // this is *10*
>     Map<String, Integer> topicMap = new HashMap<>();
>     for (String topic: topics) {
>       topicMap.put(topic, numStreams);
>     }
>     List<JavaPairDStream<byte[], byte[]>> kafkaStreams = new
> ArrayList<>(numStreams);
>     for (int i = 0; i < numStreams; i++) {
>       kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
> byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
> topicMap, StorageLevel.MEMORY_ONLY_SER()));
>     }
>     JavaPairDStream<byte[], byte[]> ks = sc.union(kafkaStreams.remove(0),
> kafkaStreams);
> On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas <ge...@gmail.com> wrote:
>> Hi,
>>
>> Could you add the code where you create the Kafka consumer?
>>
>> -kr, Gerard.
>>
>> On Wed, Jan 7, 2015 at 3:43 PM, <fr...@typesafe.com> wrote:
>>
>>> Hi Mukesh,
>>>
>>> If my understanding is correct, each Stream only has a single Receiver.
>>> So, if you have each receiver consuming 9 partitions, you need 10 input
>>> DStreams to create 10 concurrent receivers:
>>>
>>>
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>>>
>>> Would you mind sharing a bit more on how you achieve this ?
>>>
>>> --
>>> FG
>>>
>>>
>>> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me...@gmail.com>
>>> wrote:
>>>
>>>> Hi Guys,
>>>>
>>>> I have a kafka topic having 90 partitions and I running
>>>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
>>>> kafka-receivers.
>>>>
>>>> My streaming is running fine and there is no delay in processing, just
>>>> that some partitions data is never getting picked up. From the kafka
>>>> console I can see that each receiver is consuming data from 9 partitions
>>>> but the lag for some offsets keeps on increasing.
>>>>
>>>> Below is my kafka-consumers parameters.
>>>>
>>>> Any of you have face this kind of issue, if so then do you have any
>>>> pointers to fix it?
>>>>
>>>>  Map<String, String> kafkaConf = new HashMap<String, String>();
>>>>  kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>>>>  kafkaConf.put("group.id", kafkaConsumerGroup);
>>>>  kafkaConf.put("consumer.timeout.ms", "30000");
>>>>  kafkaConf.put("auto.offset.reset", "largest");
>>>>  kafkaConf.put("fetch.message.max.bytes", "20000000");
>>>>  kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>>>>  kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>>>>  kafkaConf.put("zookeeper.sync.time.ms", "2000");
>>>>  kafkaConf.put("rebalance.backoff.ms", "10000");
>>>>  kafkaConf.put("rebalance.max.retries", "20");
>>>>
>>>> --
>>>> Thanks & Regards,
>>>>
>>>> Mukesh Jha <me...@gmail.com>
>>>>
>>>
>>>
>>
> -- 
> Thanks & Regards,
> *Mukesh Jha <me...@gmail.com>*

Re: KafkaUtils not consuming all the data from all partitions

Posted by Mukesh Jha <me...@gmail.com>.
I understand that I've to create 10 parallel streams. My code is running
fine when the no of partitions is ~20, but when I increase the no of
partitions I keep getting in this issue.

Below is my code to create kafka streams, along with the configs used.

    Map<String, String> kafkaConf = new HashMap<String, String>();
    kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
    kafkaConf.put("group.id", kafkaConsumerGroup);
    kafkaConf.put("consumer.timeout.ms", "30000");
    kafkaConf.put("auto.offset.reset", "largest");
    kafkaConf.put("fetch.message.max.bytes", "20000000");
    kafkaConf.put("zookeeper.session.timeout.ms", "6000");
    kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
    kafkaConf.put("zookeeper.sync.time.ms", "2000");
    kafkaConf.put("rebalance.backoff.ms", "10000");
    kafkaConf.put("rebalance.max.retries", "20");
    String[] topics = kafkaTopicsList;
    int numStreams = numKafkaThreads; // this is *10*
    Map<String, Integer> topicMap = new HashMap<>();
    for (String topic: topics) {
      topicMap.put(topic, numStreams);
    }

    List<JavaPairDStream<byte[], byte[]>> kafkaStreams = new
ArrayList<>(numStreams);
    for (int i = 0; i < numStreams; i++) {
      kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
topicMap, StorageLevel.MEMORY_ONLY_SER()));
    }
    JavaPairDStream<byte[], byte[]> ks = sc.union(kafkaStreams.remove(0),
kafkaStreams);


On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas <ge...@gmail.com> wrote:

> Hi,
>
> Could you add the code where you create the Kafka consumer?
>
> -kr, Gerard.
>
> On Wed, Jan 7, 2015 at 3:43 PM, <fr...@typesafe.com> wrote:
>
>> Hi Mukesh,
>>
>> If my understanding is correct, each Stream only has a single Receiver.
>> So, if you have each receiver consuming 9 partitions, you need 10 input
>> DStreams to create 10 concurrent receivers:
>>
>>
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>>
>> Would you mind sharing a bit more on how you achieve this ?
>>
>> --
>> FG
>>
>>
>> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me...@gmail.com>
>> wrote:
>>
>>> Hi Guys,
>>>
>>> I have a kafka topic having 90 partitions and I running
>>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
>>> kafka-receivers.
>>>
>>> My streaming is running fine and there is no delay in processing, just
>>> that some partitions data is never getting picked up. From the kafka
>>> console I can see that each receiver is consuming data from 9 partitions
>>> but the lag for some offsets keeps on increasing.
>>>
>>> Below is my kafka-consumers parameters.
>>>
>>> Any of you have face this kind of issue, if so then do you have any
>>> pointers to fix it?
>>>
>>>  Map<String, String> kafkaConf = new HashMap<String, String>();
>>>  kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>>>  kafkaConf.put("group.id", kafkaConsumerGroup);
>>>  kafkaConf.put("consumer.timeout.ms", "30000");
>>>  kafkaConf.put("auto.offset.reset", "largest");
>>>  kafkaConf.put("fetch.message.max.bytes", "20000000");
>>>  kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>>>  kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>>>  kafkaConf.put("zookeeper.sync.time.ms", "2000");
>>>  kafkaConf.put("rebalance.backoff.ms", "10000");
>>>  kafkaConf.put("rebalance.max.retries", "20");
>>>
>>> --
>>> Thanks & Regards,
>>>
>>> Mukesh Jha <me...@gmail.com>
>>>
>>
>>
>


-- 


Thanks & Regards,

*Mukesh Jha <me...@gmail.com>*

Re: KafkaUtils not consuming all the data from all partitions

Posted by Mukesh Jha <me...@gmail.com>.
I understand that I've to create 10 parallel streams. My code is running
fine when the no of partitions is ~20, but when I increase the no of
partitions I keep getting in this issue.

Below is my code to create kafka streams, along with the configs used.

    Map<String, String> kafkaConf = new HashMap<String, String>();
    kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
    kafkaConf.put("group.id", kafkaConsumerGroup);
    kafkaConf.put("consumer.timeout.ms", "30000");
    kafkaConf.put("auto.offset.reset", "largest");
    kafkaConf.put("fetch.message.max.bytes", "20000000");
    kafkaConf.put("zookeeper.session.timeout.ms", "6000");
    kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
    kafkaConf.put("zookeeper.sync.time.ms", "2000");
    kafkaConf.put("rebalance.backoff.ms", "10000");
    kafkaConf.put("rebalance.max.retries", "20");
    String[] topics = kafkaTopicsList;
    int numStreams = numKafkaThreads; // this is *10*
    Map<String, Integer> topicMap = new HashMap<>();
    for (String topic: topics) {
      topicMap.put(topic, numStreams);
    }

    List<JavaPairDStream<byte[], byte[]>> kafkaStreams = new
ArrayList<>(numStreams);
    for (int i = 0; i < numStreams; i++) {
      kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
topicMap, StorageLevel.MEMORY_ONLY_SER()));
    }
    JavaPairDStream<byte[], byte[]> ks = sc.union(kafkaStreams.remove(0),
kafkaStreams);


On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas <ge...@gmail.com> wrote:

> Hi,
>
> Could you add the code where you create the Kafka consumer?
>
> -kr, Gerard.
>
> On Wed, Jan 7, 2015 at 3:43 PM, <fr...@typesafe.com> wrote:
>
>> Hi Mukesh,
>>
>> If my understanding is correct, each Stream only has a single Receiver.
>> So, if you have each receiver consuming 9 partitions, you need 10 input
>> DStreams to create 10 concurrent receivers:
>>
>>
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>>
>> Would you mind sharing a bit more on how you achieve this ?
>>
>> --
>> FG
>>
>>
>> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me...@gmail.com>
>> wrote:
>>
>>> Hi Guys,
>>>
>>> I have a kafka topic having 90 partitions and I running
>>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
>>> kafka-receivers.
>>>
>>> My streaming is running fine and there is no delay in processing, just
>>> that some partitions data is never getting picked up. From the kafka
>>> console I can see that each receiver is consuming data from 9 partitions
>>> but the lag for some offsets keeps on increasing.
>>>
>>> Below is my kafka-consumers parameters.
>>>
>>> Any of you have face this kind of issue, if so then do you have any
>>> pointers to fix it?
>>>
>>>  Map<String, String> kafkaConf = new HashMap<String, String>();
>>>  kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>>>  kafkaConf.put("group.id", kafkaConsumerGroup);
>>>  kafkaConf.put("consumer.timeout.ms", "30000");
>>>  kafkaConf.put("auto.offset.reset", "largest");
>>>  kafkaConf.put("fetch.message.max.bytes", "20000000");
>>>  kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>>>  kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>>>  kafkaConf.put("zookeeper.sync.time.ms", "2000");
>>>  kafkaConf.put("rebalance.backoff.ms", "10000");
>>>  kafkaConf.put("rebalance.max.retries", "20");
>>>
>>> --
>>> Thanks & Regards,
>>>
>>> Mukesh Jha <me...@gmail.com>
>>>
>>
>>
>


-- 


Thanks & Regards,

*Mukesh Jha <me...@gmail.com>*

Re: KafkaUtils not consuming all the data from all partitions

Posted by Gerard Maas <ge...@gmail.com>.
Hi,

Could you add the code where you create the Kafka consumer?

-kr, Gerard.

On Wed, Jan 7, 2015 at 3:43 PM, <fr...@typesafe.com> wrote:

> Hi Mukesh,
>
> If my understanding is correct, each Stream only has a single Receiver.
> So, if you have each receiver consuming 9 partitions, you need 10 input
> DStreams to create 10 concurrent receivers:
>
>
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>
> Would you mind sharing a bit more on how you achieve this ?
>
> —
> FG
>
>
> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me...@gmail.com>
> wrote:
>
>> Hi Guys,
>>
>> I have a kafka topic having 90 partitions and I running
>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
>> kafka-receivers.
>>
>> My streaming is running fine and there is no delay in processing, just
>> that some partitions data is never getting picked up. From the kafka
>> console I can see that each receiver is consuming data from 9 partitions
>> but the lag for some offsets keeps on increasing.
>>
>> Below is my kafka-consumers parameters.
>>
>> Any of you have face this kind of issue, if so then do you have any
>> pointers to fix it?
>>
>>  Map<String, String> kafkaConf = new HashMap<String, String>();
>>  kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>>  kafkaConf.put("group.id", kafkaConsumerGroup);
>>  kafkaConf.put("consumer.timeout.ms", "30000");
>>  kafkaConf.put("auto.offset.reset", "largest");
>>  kafkaConf.put("fetch.message.max.bytes", "20000000");
>>  kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>>  kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>>  kafkaConf.put("zookeeper.sync.time.ms", "2000");
>>  kafkaConf.put("rebalance.backoff.ms", "10000");
>>  kafkaConf.put("rebalance.max.retries", "20");
>>
>> --
>> Thanks & Regards,
>>
>> Mukesh Jha <me...@gmail.com>
>>
>
>

Re: KafkaUtils not consuming all the data from all partitions

Posted by Gerard Maas <ge...@gmail.com>.
Hi,

Could you add the code where you create the Kafka consumer?

-kr, Gerard.

On Wed, Jan 7, 2015 at 3:43 PM, <fr...@typesafe.com> wrote:

> Hi Mukesh,
>
> If my understanding is correct, each Stream only has a single Receiver.
> So, if you have each receiver consuming 9 partitions, you need 10 input
> DStreams to create 10 concurrent receivers:
>
>
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>
> Would you mind sharing a bit more on how you achieve this ?
>
> —
> FG
>
>
> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me...@gmail.com>
> wrote:
>
>> Hi Guys,
>>
>> I have a kafka topic having 90 partitions and I running
>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
>> kafka-receivers.
>>
>> My streaming is running fine and there is no delay in processing, just
>> that some partitions data is never getting picked up. From the kafka
>> console I can see that each receiver is consuming data from 9 partitions
>> but the lag for some offsets keeps on increasing.
>>
>> Below is my kafka-consumers parameters.
>>
>> Any of you have face this kind of issue, if so then do you have any
>> pointers to fix it?
>>
>>  Map<String, String> kafkaConf = new HashMap<String, String>();
>>  kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>>  kafkaConf.put("group.id", kafkaConsumerGroup);
>>  kafkaConf.put("consumer.timeout.ms", "30000");
>>  kafkaConf.put("auto.offset.reset", "largest");
>>  kafkaConf.put("fetch.message.max.bytes", "20000000");
>>  kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>>  kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>>  kafkaConf.put("zookeeper.sync.time.ms", "2000");
>>  kafkaConf.put("rebalance.backoff.ms", "10000");
>>  kafkaConf.put("rebalance.max.retries", "20");
>>
>> --
>> Thanks & Regards,
>>
>> Mukesh Jha <me...@gmail.com>
>>
>
>

Re: KafkaUtils not consuming all the data from all partitions

Posted by fr...@typesafe.com.
Hi Mukesh,




If my understanding is correct, each Stream only has a single Receiver. So, if you have each receiver consuming 9 partitions, you need 10 input DStreams to create 10 concurrent receivers:




https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving





Would you mind sharing a bit more on how you achieve this ?


—
FG

On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me...@gmail.com>
wrote:

> Hi Guys,
> I have a kafka topic having 90 partitions and I running
> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
> kafka-receivers.
> My streaming is running fine and there is no delay in processing, just that
> some partitions data is never getting picked up. From the kafka console I
> can see that each receiver is consuming data from 9 partitions but the lag
> for some offsets keeps on increasing.
> Below is my kafka-consumers parameters.
> Any of you have face this kind of issue, if so then do you have any
> pointers to fix it?
> Map<String, String> kafkaConf = new HashMap<String, String>();
> kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
> kafkaConf.put("group.id", kafkaConsumerGroup);
> kafkaConf.put("consumer.timeout.ms", "30000");
> kafkaConf.put("auto.offset.reset", "largest");
> kafkaConf.put("fetch.message.max.bytes", "20000000");
> kafkaConf.put("zookeeper.session.timeout.ms", "6000");
> kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
> kafkaConf.put("zookeeper.sync.time.ms", "2000");
> kafkaConf.put("rebalance.backoff.ms", "10000");
> kafkaConf.put("rebalance.max.retries", "20");
> -- 
> Thanks & Regards,
> *Mukesh Jha <me...@gmail.com>*

Re: KafkaUtils not consuming all the data from all partitions

Posted by fr...@typesafe.com.
Hi Mukesh,




If my understanding is correct, each Stream only has a single Receiver. So, if you have each receiver consuming 9 partitions, you need 10 input DStreams to create 10 concurrent receivers:




https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving





Would you mind sharing a bit more on how you achieve this ?


—
FG

On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me...@gmail.com>
wrote:

> Hi Guys,
> I have a kafka topic having 90 partitions and I running
> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
> kafka-receivers.
> My streaming is running fine and there is no delay in processing, just that
> some partitions data is never getting picked up. From the kafka console I
> can see that each receiver is consuming data from 9 partitions but the lag
> for some offsets keeps on increasing.
> Below is my kafka-consumers parameters.
> Any of you have face this kind of issue, if so then do you have any
> pointers to fix it?
> Map<String, String> kafkaConf = new HashMap<String, String>();
> kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
> kafkaConf.put("group.id", kafkaConsumerGroup);
> kafkaConf.put("consumer.timeout.ms", "30000");
> kafkaConf.put("auto.offset.reset", "largest");
> kafkaConf.put("fetch.message.max.bytes", "20000000");
> kafkaConf.put("zookeeper.session.timeout.ms", "6000");
> kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
> kafkaConf.put("zookeeper.sync.time.ms", "2000");
> kafkaConf.put("rebalance.backoff.ms", "10000");
> kafkaConf.put("rebalance.max.retries", "20");
> -- 
> Thanks & Regards,
> *Mukesh Jha <me...@gmail.com>*