You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tobias Pfeiffer <tg...@preferred.jp> on 2014/06/28 06:49:04 UTC

Distribute data from Kafka evenly on cluster

Hi,

I have a number of questions using the Kafka receiver of Spark
Streaming. Maybe someone has some more experience with that and can
help me out.

I have set up an environment for getting to know Spark, consisting of
- a Mesos cluster with 3 only-slaves and 3 master-and-slaves,
- 2 Kafka nodes,
- 3 Zookeeper nodes providing service to both Kafka and Mesos.

My Kafka cluster has only one topic with one partition (replicated to
both nodes). When I start my Kafka receiver, it successfully connects
to Kafka and does the processing, but it seems as if the (expensive)
function in the final foreachRDD(...) is only executed on one node of
my cluster, which is not what I had in mind when setting up the
cluster ;-)

So first, I was wondering about the parameter `topics: Map[String,
Int]` to KafkaUtils.createStream(). Apparently it controls how many
connections are made from my cluster nodes to Kafka. The Kafka doc at
https://kafka.apache.org/documentation.html#introduction says "each
message published to a topic is delivered to one consumer instance
within each subscribing consumer group" and "If all the consumer
instances have the same consumer group, then this works just like a
traditional queue balancing load over the consumers."

The Kafka docs *also* say: "Note however that there cannot be more
consumer instances than partitions." This seems to imply that with
only one partition, increasing the number in my Map should have no
effect.

However, if I increase the number of streams for my one topic in my
`topics` Map, I actually *do* see that the task in my foreachRDD(...)
call is now executed on multiple nodes. Maybe it's more of a Kafka
question than a Spark one, but can anyone explain this to me? Should I
always have more Kafka partitions than Mesos cluster nodes?

So, assuming that changing the number in that Map is not what I want
(although I don't know if it is), I tried to use
.repartition(numOfClusterNodes) (which doesn't seem right if I want to
add and remove Mesos nodes on demand). This *also* did spread the
foreachRDD(...) action evenly – however, the function never seems to
terminate, so I never get to process the next interval in the stream.
A similar behavior can be observed when running locally, not on the
cluster, then the program will not exit but instead hang after
everything else has shut down. Any hints concerning this issue?

Thanks
Tobias

Re: Distribute data from Kafka evenly on cluster

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

as far as I know, rebalance is triggered from Kafka in order to distribute
partitions evenly. That is, to achieve the opposite of what you are seeing.
I think it would be interesting to check the Kafka logs for the result of
the rebalance operation and why you see what you are seeing. I know that in
the client logs it says which partitions of a topic were assigned to this
particular consumer, maybe you can have a look.

Tobias


On Fri, Jul 18, 2014 at 11:42 PM, Chen Song <ch...@gmail.com> wrote:

> Speaking of this, I have another related question.
>
> In my spark streaming job, I set up multiple consumers to receive data
> from Kafka, with each worker from one partition.
>
> Initially, Spark is intelligent enough to associate each worker to each
> partition, to make data consumption distributed. After running for a while,
> consumers rebalance themselves and some workers start reading partitions
> which were with others. This leads to a situation that some worker read
> from multiple partitions and some don't read at all. Because of data
> volume, this causes heap pressure on some workers.
>
> Any thoughts on why rebalance is triggered and how to monitor to avoid
> that?
>
>
>
>
> On Fri, Jul 4, 2014 at 11:11 AM, Tobias Pfeiffer <tg...@preferred.jp> wrote:
>
>> Hi,
>>
>> unfortunately, when I go the above approach, I run into this problem:
>>
>> http://mail-archives.apache.org/mod_mbox/kafka-users/201401.mbox/%3CCABtFeVyxVtAqVnmVwmH7ysCfGXpW5KmrNw_Gnq72Cy4Oa1bKTA@mail.gmail.com%3E
>> That is, a NoNode error in Zookeeper when rebalancing. The Kafka
>> receiver will retry again and again, but will eventually fail, leading to
>> unprocessed data and, worse, the task never terminating. There is nothing
>> exotic about my setup; one Zookeeper node, one Kafka broker, so I am
>> wondering if other people have seen this error before and, more important,
>> how to fix it. When I don't use the approach of multiple kafkaStreams, I
>> don't get this error, but also work is never distributed in my cluster...
>>
>> Thanks
>> Tobias
>>
>>
>> On Thu, Jul 3, 2014 at 11:58 AM, Tobias Pfeiffer <tg...@preferred.jp>
>> wrote:
>>
>>> Thank you very much for the link, that was very helpful!
>>>
>>> So, apparently the `topics: Map[String, Int]` parameter controls the
>>> number of partitions that the data is initially added to; the number N in
>>>
>>>   val kafkaInputs = (1 to N).map { _ =>
>>>     ssc.kafkaStream(zkQuorum, groupId, Map("topic" -> 1))
>>>   }
>>>   val union = ssc.union(kafkaInputs)
>>>
>>> controls how many connections are made to Kafka. Note that the number of
>>> Kafka partitions for that topic must be at least N for this to work.
>>>
>>> Thanks
>>> Tobias
>>>
>>
>>
>
>
> --
> Chen Song
>
>

Re: Distribute data from Kafka evenly on cluster

Posted by Chen Song <ch...@gmail.com>.
Speaking of this, I have another related question.

In my spark streaming job, I set up multiple consumers to receive data from
Kafka, with each worker from one partition.

Initially, Spark is intelligent enough to associate each worker to each
partition, to make data consumption distributed. After running for a while,
consumers rebalance themselves and some workers start reading partitions
which were with others. This leads to a situation that some worker read
from multiple partitions and some don't read at all. Because of data
volume, this causes heap pressure on some workers.

Any thoughts on why rebalance is triggered and how to monitor to avoid that?




On Fri, Jul 4, 2014 at 11:11 AM, Tobias Pfeiffer <tg...@preferred.jp> wrote:

> Hi,
>
> unfortunately, when I go the above approach, I run into this problem:
>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201401.mbox/%3CCABtFeVyxVtAqVnmVwmH7ysCfGXpW5KmrNw_Gnq72Cy4Oa1bKTA@mail.gmail.com%3E
> That is, a NoNode error in Zookeeper when rebalancing. The Kafka receiver
> will retry again and again, but will eventually fail, leading to
> unprocessed data and, worse, the task never terminating. There is nothing
> exotic about my setup; one Zookeeper node, one Kafka broker, so I am
> wondering if other people have seen this error before and, more important,
> how to fix it. When I don't use the approach of multiple kafkaStreams, I
> don't get this error, but also work is never distributed in my cluster...
>
> Thanks
> Tobias
>
>
> On Thu, Jul 3, 2014 at 11:58 AM, Tobias Pfeiffer <tg...@preferred.jp> wrote:
>
>> Thank you very much for the link, that was very helpful!
>>
>> So, apparently the `topics: Map[String, Int]` parameter controls the
>> number of partitions that the data is initially added to; the number N in
>>
>>   val kafkaInputs = (1 to N).map { _ =>
>>     ssc.kafkaStream(zkQuorum, groupId, Map("topic" -> 1))
>>   }
>>   val union = ssc.union(kafkaInputs)
>>
>> controls how many connections are made to Kafka. Note that the number of
>> Kafka partitions for that topic must be at least N for this to work.
>>
>> Thanks
>> Tobias
>>
>
>


-- 
Chen Song

Re: Distribute data from Kafka evenly on cluster

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

unfortunately, when I go the above approach, I run into this problem:

http://mail-archives.apache.org/mod_mbox/kafka-users/201401.mbox/%3CCABtFeVyxVtAqVnmVwmH7ysCfGXpW5KmrNw_Gnq72Cy4Oa1bKTA@mail.gmail.com%3E
That is, a NoNode error in Zookeeper when rebalancing. The Kafka receiver
will retry again and again, but will eventually fail, leading to
unprocessed data and, worse, the task never terminating. There is nothing
exotic about my setup; one Zookeeper node, one Kafka broker, so I am
wondering if other people have seen this error before and, more important,
how to fix it. When I don't use the approach of multiple kafkaStreams, I
don't get this error, but also work is never distributed in my cluster...

Thanks
Tobias


On Thu, Jul 3, 2014 at 11:58 AM, Tobias Pfeiffer <tg...@preferred.jp> wrote:

> Thank you very much for the link, that was very helpful!
>
> So, apparently the `topics: Map[String, Int]` parameter controls the
> number of partitions that the data is initially added to; the number N in
>
>   val kafkaInputs = (1 to N).map { _ =>
>     ssc.kafkaStream(zkQuorum, groupId, Map("topic" -> 1))
>   }
>   val union = ssc.union(kafkaInputs)
>
> controls how many connections are made to Kafka. Note that the number of
> Kafka partitions for that topic must be at least N for this to work.
>
> Thanks
> Tobias
>

Re: Distribute data from Kafka evenly on cluster

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Thank you very much for the link, that was very helpful!

So, apparently the `topics: Map[String, Int]` parameter controls the number
of partitions that the data is initially added to; the number N in

  val kafkaInputs = (1 to N).map { _ =>
    ssc.kafkaStream(zkQuorum, groupId, Map("topic" -> 1))
  }
  val union = ssc.union(kafkaInputs)

controls how many connections are made to Kafka. Note that the number of
Kafka partitions for that topic must be at least N for this to work.

Thanks
Tobias

Re: Distribute data from Kafka evenly on cluster

Posted by Mayur Rustagi <ma...@gmail.com>.
how abou this?
https://groups.google.com/forum/#!topic/spark-users/ntPQUZFJt4M


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Sat, Jun 28, 2014 at 10:19 AM, Tobias Pfeiffer <tg...@preferred.jp> wrote:

> Hi,
>
> I have a number of questions using the Kafka receiver of Spark
> Streaming. Maybe someone has some more experience with that and can
> help me out.
>
> I have set up an environment for getting to know Spark, consisting of
> - a Mesos cluster with 3 only-slaves and 3 master-and-slaves,
> - 2 Kafka nodes,
> - 3 Zookeeper nodes providing service to both Kafka and Mesos.
>
> My Kafka cluster has only one topic with one partition (replicated to
> both nodes). When I start my Kafka receiver, it successfully connects
> to Kafka and does the processing, but it seems as if the (expensive)
> function in the final foreachRDD(...) is only executed on one node of
> my cluster, which is not what I had in mind when setting up the
> cluster ;-)
>
> So first, I was wondering about the parameter `topics: Map[String,
> Int]` to KafkaUtils.createStream(). Apparently it controls how many
> connections are made from my cluster nodes to Kafka. The Kafka doc at
> https://kafka.apache.org/documentation.html#introduction says "each
> message published to a topic is delivered to one consumer instance
> within each subscribing consumer group" and "If all the consumer
> instances have the same consumer group, then this works just like a
> traditional queue balancing load over the consumers."
>
> The Kafka docs *also* say: "Note however that there cannot be more
> consumer instances than partitions." This seems to imply that with
> only one partition, increasing the number in my Map should have no
> effect.
>
> However, if I increase the number of streams for my one topic in my
> `topics` Map, I actually *do* see that the task in my foreachRDD(...)
> call is now executed on multiple nodes. Maybe it's more of a Kafka
> question than a Spark one, but can anyone explain this to me? Should I
> always have more Kafka partitions than Mesos cluster nodes?
>
> So, assuming that changing the number in that Map is not what I want
> (although I don't know if it is), I tried to use
> .repartition(numOfClusterNodes) (which doesn't seem right if I want to
> add and remove Mesos nodes on demand). This *also* did spread the
> foreachRDD(...) action evenly – however, the function never seems to
> terminate, so I never get to process the next interval in the stream.
> A similar behavior can be observed when running locally, not on the
> cluster, then the program will not exit but instead hang after
> everything else has shut down. Any hints concerning this issue?
>
> Thanks
> Tobias
>