You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by richiesgr <ri...@gmail.com> on 2014/09/10 16:16:50 UTC

How to scale more consumer to Kafka stream

Hi (my previous post as been used by someone else)

I'm building a application the read from kafka stream event. In production
we've 5 consumers that share 10 partitions. 
But on spark streaming kafka only 1 worker act as a consumer then distribute
the tasks to workers so I can have only 1 machine acting as consumer but I
need more because only 1 consumer means Lags. 

Do you've any idea what I can do ? Another point is interresting the master
is not loaded at all I can get up more than 10 % CPU 

I've tried to increase the queued.max.message.chunks on the kafka client to
read more records thinking it'll speed up the read but I only get 

ERROR consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId:
SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] ->
PartitionFetchInfo(929838589,1048576),[IA2,6] ->
PartitionFetchInfo(929515796,1048576),[IA2,9] ->
PartitionFetchInfo(929577946,1048576),[IA2,8] ->
PartitionFetchInfo(930751599,1048576),[IA2,2] ->
PartitionFetchInfo(926457704,1048576),[IA2,5] ->
PartitionFetchInfo(930774385,1048576),[IA2,0] ->
PartitionFetchInfo(929913213,1048576),[IA2,3] ->
PartitionFetchInfo(929268891,1048576),[IA2,4] ->
PartitionFetchInfo(929949877,1048576),[IA2,1] ->
PartitionFetchInfo(930063114,1048576) 
java.lang.OutOfMemoryError: Java heap space 

Is someone have ideas ? 
Thanks



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to scale more consumer to Kafka stream

Posted by richiesgr <ri...@gmail.com>.
Thanks for all 
I'm going to check both solution



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883p13959.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to scale more consumer to Kafka stream

Posted by Dibyendu Bhattacharya <di...@gmail.com>.
Hi,

You can use this Kafka Spark Consumer.
https://github.com/dibbhatt/kafka-spark-consumer

This is exactly does that . It creates parallel Receivers for every Kafka
topic partitions. You can see the Consumer.java under consumer.kafka.client
package to see an example how to use it.

There is some discussion on this Consumer you can find it here :
https://mail.google.com/mail/u/1/?tab=wm#search/kafka+spark+consumer/14797b2cbbaa8689

Regards,
Dib


On Wed, Sep 10, 2014 at 11:47 PM, Tim Smith <se...@gmail.com> wrote:

> How are you creating your kafka streams in Spark?
>
> If you have 10 partitions for a topic, you can call "createStream" ten
> times to create 10 parallel receivers/executors and then use "union" to
> combine all the dStreams.
>
>
>
> On Wed, Sep 10, 2014 at 7:16 AM, richiesgr <ri...@gmail.com> wrote:
>
>> Hi (my previous post as been used by someone else)
>>
>> I'm building a application the read from kafka stream event. In production
>> we've 5 consumers that share 10 partitions.
>> But on spark streaming kafka only 1 worker act as a consumer then
>> distribute
>> the tasks to workers so I can have only 1 machine acting as consumer but I
>> need more because only 1 consumer means Lags.
>>
>> Do you've any idea what I can do ? Another point is interresting the
>> master
>> is not loaded at all I can get up more than 10 % CPU
>>
>> I've tried to increase the queued.max.message.chunks on the kafka client
>> to
>> read more records thinking it'll speed up the read but I only get
>>
>> ERROR consumer.ConsumerFetcherThread:
>>
>> [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
>> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73;
>> ClientId:
>>
>> SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
>> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] ->
>> PartitionFetchInfo(929838589,1048576),[IA2,6] ->
>> PartitionFetchInfo(929515796,1048576),[IA2,9] ->
>> PartitionFetchInfo(929577946,1048576),[IA2,8] ->
>> PartitionFetchInfo(930751599,1048576),[IA2,2] ->
>> PartitionFetchInfo(926457704,1048576),[IA2,5] ->
>> PartitionFetchInfo(930774385,1048576),[IA2,0] ->
>> PartitionFetchInfo(929913213,1048576),[IA2,3] ->
>> PartitionFetchInfo(929268891,1048576),[IA2,4] ->
>> PartitionFetchInfo(929949877,1048576),[IA2,1] ->
>> PartitionFetchInfo(930063114,1048576)
>> java.lang.OutOfMemoryError: Java heap space
>>
>> Is someone have ideas ?
>> Thanks
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: How to scale more consumer to Kafka stream

Posted by Dibyendu Bhattacharya <di...@gmail.com>.
I agree Gerard. Thanks for pointing this..

Dib

On Thu, Sep 11, 2014 at 5:28 PM, Gerard Maas <ge...@gmail.com> wrote:

> This pattern works.
>
> One note, thought: Use 'union' only if you need to group the data from all
> RDDs into one RDD for processing (like count distinct or need a groupby).
> If your process can be parallelized over every stream of incoming data, I
> suggest you just apply the required transformations on every dstream and
> avoid 'union' altogether.
>
> -kr, Gerard.
>
>
>
> On Wed, Sep 10, 2014 at 8:17 PM, Tim Smith <se...@gmail.com> wrote:
>
>> How are you creating your kafka streams in Spark?
>>
>> If you have 10 partitions for a topic, you can call "createStream" ten
>> times to create 10 parallel receivers/executors and then use "union" to
>> combine all the dStreams.
>>
>>
>>
>> On Wed, Sep 10, 2014 at 7:16 AM, richiesgr <ri...@gmail.com> wrote:
>>
>>> Hi (my previous post as been used by someone else)
>>>
>>> I'm building a application the read from kafka stream event. In
>>> production
>>> we've 5 consumers that share 10 partitions.
>>> But on spark streaming kafka only 1 worker act as a consumer then
>>> distribute
>>> the tasks to workers so I can have only 1 machine acting as consumer but
>>> I
>>> need more because only 1 consumer means Lags.
>>>
>>> Do you've any idea what I can do ? Another point is interresting the
>>> master
>>> is not loaded at all I can get up more than 10 % CPU
>>>
>>> I've tried to increase the queued.max.message.chunks on the kafka client
>>> to
>>> read more records thinking it'll speed up the read but I only get
>>>
>>> ERROR consumer.ConsumerFetcherThread:
>>>
>>> [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
>>> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73;
>>> ClientId:
>>>
>>> SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
>>> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7]
>>> ->
>>> PartitionFetchInfo(929838589,1048576),[IA2,6] ->
>>> PartitionFetchInfo(929515796,1048576),[IA2,9] ->
>>> PartitionFetchInfo(929577946,1048576),[IA2,8] ->
>>> PartitionFetchInfo(930751599,1048576),[IA2,2] ->
>>> PartitionFetchInfo(926457704,1048576),[IA2,5] ->
>>> PartitionFetchInfo(930774385,1048576),[IA2,0] ->
>>> PartitionFetchInfo(929913213,1048576),[IA2,3] ->
>>> PartitionFetchInfo(929268891,1048576),[IA2,4] ->
>>> PartitionFetchInfo(929949877,1048576),[IA2,1] ->
>>> PartitionFetchInfo(930063114,1048576)
>>> java.lang.OutOfMemoryError: Java heap space
>>>
>>> Is someone have ideas ?
>>> Thanks
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>

Re: How to scale more consumer to Kafka stream

Posted by Gerard Maas <ge...@gmail.com>.
This pattern works.

One note, thought: Use 'union' only if you need to group the data from all
RDDs into one RDD for processing (like count distinct or need a groupby).
If your process can be parallelized over every stream of incoming data, I
suggest you just apply the required transformations on every dstream and
avoid 'union' altogether.

-kr, Gerard.



On Wed, Sep 10, 2014 at 8:17 PM, Tim Smith <se...@gmail.com> wrote:

> How are you creating your kafka streams in Spark?
>
> If you have 10 partitions for a topic, you can call "createStream" ten
> times to create 10 parallel receivers/executors and then use "union" to
> combine all the dStreams.
>
>
>
> On Wed, Sep 10, 2014 at 7:16 AM, richiesgr <ri...@gmail.com> wrote:
>
>> Hi (my previous post as been used by someone else)
>>
>> I'm building a application the read from kafka stream event. In production
>> we've 5 consumers that share 10 partitions.
>> But on spark streaming kafka only 1 worker act as a consumer then
>> distribute
>> the tasks to workers so I can have only 1 machine acting as consumer but I
>> need more because only 1 consumer means Lags.
>>
>> Do you've any idea what I can do ? Another point is interresting the
>> master
>> is not loaded at all I can get up more than 10 % CPU
>>
>> I've tried to increase the queued.max.message.chunks on the kafka client
>> to
>> read more records thinking it'll speed up the read but I only get
>>
>> ERROR consumer.ConsumerFetcherThread:
>>
>> [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
>> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73;
>> ClientId:
>>
>> SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
>> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] ->
>> PartitionFetchInfo(929838589,1048576),[IA2,6] ->
>> PartitionFetchInfo(929515796,1048576),[IA2,9] ->
>> PartitionFetchInfo(929577946,1048576),[IA2,8] ->
>> PartitionFetchInfo(930751599,1048576),[IA2,2] ->
>> PartitionFetchInfo(926457704,1048576),[IA2,5] ->
>> PartitionFetchInfo(930774385,1048576),[IA2,0] ->
>> PartitionFetchInfo(929913213,1048576),[IA2,3] ->
>> PartitionFetchInfo(929268891,1048576),[IA2,4] ->
>> PartitionFetchInfo(929949877,1048576),[IA2,1] ->
>> PartitionFetchInfo(930063114,1048576)
>> java.lang.OutOfMemoryError: Java heap space
>>
>> Is someone have ideas ?
>> Thanks
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: How to scale more consumer to Kafka stream

Posted by Tim Smith <se...@gmail.com>.
How are you creating your kafka streams in Spark?

If you have 10 partitions for a topic, you can call "createStream" ten
times to create 10 parallel receivers/executors and then use "union" to
combine all the dStreams.



On Wed, Sep 10, 2014 at 7:16 AM, richiesgr <ri...@gmail.com> wrote:

> Hi (my previous post as been used by someone else)
>
> I'm building a application the read from kafka stream event. In production
> we've 5 consumers that share 10 partitions.
> But on spark streaming kafka only 1 worker act as a consumer then
> distribute
> the tasks to workers so I can have only 1 machine acting as consumer but I
> need more because only 1 consumer means Lags.
>
> Do you've any idea what I can do ? Another point is interresting the master
> is not loaded at all I can get up more than 10 % CPU
>
> I've tried to increase the queued.max.message.chunks on the kafka client to
> read more records thinking it'll speed up the read but I only get
>
> ERROR consumer.ConsumerFetcherThread:
>
> [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId:
>
> SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] ->
> PartitionFetchInfo(929838589,1048576),[IA2,6] ->
> PartitionFetchInfo(929515796,1048576),[IA2,9] ->
> PartitionFetchInfo(929577946,1048576),[IA2,8] ->
> PartitionFetchInfo(930751599,1048576),[IA2,2] ->
> PartitionFetchInfo(926457704,1048576),[IA2,5] ->
> PartitionFetchInfo(930774385,1048576),[IA2,0] ->
> PartitionFetchInfo(929913213,1048576),[IA2,3] ->
> PartitionFetchInfo(929268891,1048576),[IA2,4] ->
> PartitionFetchInfo(929949877,1048576),[IA2,1] ->
> PartitionFetchInfo(930063114,1048576)
> java.lang.OutOfMemoryError: Java heap space
>
> Is someone have ideas ?
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>