You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shushant Arora <sh...@gmail.com> on 2015/07/11 12:00:10 UTC

spark streaming doubt

1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
partitions in topic. Say I have 300 partitions in topic and 10 executors
and each with 3 cores so , is it means at a time only 10*3=30 partitions
are processed and then 30 like that since executors launch tasks per RDD
partitions , so I need in total; 300 tasks but since I have 30 cores(10
executors each with 3 cores) so these tasks will execute 30 after 30 till
300.

So reducing no of kafka paartitions to say 100 will speed up the processing?

2.In spark streaming job when I processed the kafka stream using foreachRDD

directKafkaStream.foreachRDD(new function( public void call(  vi)){
v1.foreachPartition(new function(){public void call(){
//..process partition
}})

});

since foreachRDD is operation so it spawns spark job but these jobs are not
coming on driver console like in map and print function as

1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
partitions in topic. Say I have 300 partitions in topic and 10 executors
and each with 3 cores so , is it means at a time only 10*3=30 partitions
are processed and then 30 like that since executors launch tasks per RDD
partitions , so I need in total; 300 tasks but since I have 30 cores(10
executors each with 3 cores) so these tasks will execute 30 after 30 till
300.

So reducing no of kafka paartitions to say 100 will speed up the processing?

2.In spark streaming job when I processed the kafka stream using foreachRDD

directKafkaStream.foreachRDD(new function( public void call(  vi)){
v1.foreachPartition(new function(){public void call(){
//..process partition
}})

});

since foreachRDD is operation so it spawns spark job but these jobs timings
are not coming on driver console like in map and print function as


-------------------------------------------
Time: 1429054870000 ms
-------------------------------------------
------------------------------------------
Time: 1429054871000 ms
-------------------------------------------

..................

Why is it so?


Thanks
Shushant

Re: spark streaming doubt

Posted by Shushant Arora <sh...@gmail.com>.
For second question

I am comparing 2 situtations of processing kafkaRDD.

case I - When I used foreachPartition to process kafka stream I am not able
to see any stream job timing interval like Time: 1429054870000 ms .
displayed on driver console at start of each stream batch. But it processed
each RDD and on webUI it showed jobs got created at each batch interval of
1 sec.

Case 2 -When I called mapPartition on kafkaStream RDD and then called any
action (say print()) at end of each stream interval I am getting on driver
console jobs getting created with batch interval Time: 1429054870000 ms ..

Why in case-I no information comes on driver console?

Thanks
Shushant




On Mon, Jul 13, 2015 at 7:22 PM, Cody Koeninger <co...@koeninger.org> wrote:

> Regarding your first question, having more partitions than you do
> executors usually means you'll have better utilization, because the
> workload will be distributed more evenly.  There's some degree of per-task
> overhead, but as long as you don't have a huge imbalance between number of
> tasks and number of executors that shouldn't be a large problem.
>
> I don't really understand your second question.
>
> On Sat, Jul 11, 2015 at 5:00 AM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
>> partitions in topic. Say I have 300 partitions in topic and 10 executors
>> and each with 3 cores so , is it means at a time only 10*3=30 partitions
>> are processed and then 30 like that since executors launch tasks per RDD
>> partitions , so I need in total; 300 tasks but since I have 30 cores(10
>> executors each with 3 cores) so these tasks will execute 30 after 30 till
>> 300.
>>
>> So reducing no of kafka paartitions to say 100 will speed up the
>> processing?
>>
>> 2.In spark streaming job when I processed the kafka stream using
>> foreachRDD
>>
>> directKafkaStream.foreachRDD(new function( public void call(  vi)){
>> v1.foreachPartition(new function(){public void call(){
>> //..process partition
>> }})
>>
>> });
>>
>> since foreachRDD is operation so it spawns spark job but these jobs are
>> not coming on driver console like in map and print function as
>>
>> 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
>> partitions in topic. Say I have 300 partitions in topic and 10 executors
>> and each with 3 cores so , is it means at a time only 10*3=30 partitions
>> are processed and then 30 like that since executors launch tasks per RDD
>> partitions , so I need in total; 300 tasks but since I have 30 cores(10
>> executors each with 3 cores) so these tasks will execute 30 after 30 till
>> 300.
>>
>> So reducing no of kafka paartitions to say 100 will speed up the
>> processing?
>>
>> 2.In spark streaming job when I processed the kafka stream using
>> foreachRDD
>>
>> directKafkaStream.foreachRDD(new function( public void call(  vi)){
>> v1.foreachPartition(new function(){public void call(){
>> //..process partition
>> }})
>>
>> });
>>
>> since foreachRDD is operation so it spawns spark job but these jobs
>> timings are not coming on driver console like in map and print function as
>>
>>
>> -------------------------------------------
>> Time: 1429054870000 ms
>> -------------------------------------------
>> ------------------------------------------
>> Time: 1429054871000 ms
>> -------------------------------------------
>>
>> ..................
>>
>> Why is it so?
>>
>>
>> Thanks
>> Shushant
>>
>>
>>
>>
>>
>>
>

Re: spark streaming doubt

Posted by Aniruddh Sharma <as...@gmail.com>.
Hi Sushant/Cody,

For question 1 , following is my understanding ( I am not 100% sure and
this is only my understanding, I have asked this question in another words
to TD for confirmation which is not confirmed as of now).

Following is my understanding. In accordance with tasks created in
proportion to partitions of data the main goal is to try to parallelize
execution of tasks which is dependent on number of simultaneous threads
created in executor JVM) and number of threads to be created is controlled
by User of program and User has to set this number in accordance with
number of physical cores. In Yarn number of threads should be
numDefaultPartitions*executor-cores  (which is user supplied). For example
if you have 3 physical cores in your machine then might be you can create
alteast 6 threads by passing executor-cores = 6 assuming your
numDefaultPartitions is set to 1. Then what it should do is each executor
should execute concurrent 6 tasks (rather than 3 which was number of
physical cores) and when these 6 tasks finish then it should execute
another 6 and so on. (Please note: Again this is my understanding how Spark
works and I may be wrong).

Thanks and Regards
Aniruddh

On Mon, Jul 13, 2015 at 7:22 PM, Cody Koeninger <co...@koeninger.org> wrote:

> Regarding your first question, having more partitions than you do
> executors usually means you'll have better utilization, because the
> workload will be distributed more evenly.  There's some degree of per-task
> overhead, but as long as you don't have a huge imbalance between number of
> tasks and number of executors that shouldn't be a large problem.
>
> I don't really understand your second question.
>
> On Sat, Jul 11, 2015 at 5:00 AM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
>> partitions in topic. Say I have 300 partitions in topic and 10 executors
>> and each with 3 cores so , is it means at a time only 10*3=30 partitions
>> are processed and then 30 like that since executors launch tasks per RDD
>> partitions , so I need in total; 300 tasks but since I have 30 cores(10
>> executors each with 3 cores) so these tasks will execute 30 after 30 till
>> 300.
>>
>> So reducing no of kafka paartitions to say 100 will speed up the
>> processing?
>>
>> 2.In spark streaming job when I processed the kafka stream using
>> foreachRDD
>>
>> directKafkaStream.foreachRDD(new function( public void call(  vi)){
>> v1.foreachPartition(new function(){public void call(){
>> //..process partition
>> }})
>>
>> });
>>
>> since foreachRDD is operation so it spawns spark job but these jobs are
>> not coming on driver console like in map and print function as
>>
>> 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
>> partitions in topic. Say I have 300 partitions in topic and 10 executors
>> and each with 3 cores so , is it means at a time only 10*3=30 partitions
>> are processed and then 30 like that since executors launch tasks per RDD
>> partitions , so I need in total; 300 tasks but since I have 30 cores(10
>> executors each with 3 cores) so these tasks will execute 30 after 30 till
>> 300.
>>
>> So reducing no of kafka paartitions to say 100 will speed up the
>> processing?
>>
>> 2.In spark streaming job when I processed the kafka stream using
>> foreachRDD
>>
>> directKafkaStream.foreachRDD(new function( public void call(  vi)){
>> v1.foreachPartition(new function(){public void call(){
>> //..process partition
>> }})
>>
>> });
>>
>> since foreachRDD is operation so it spawns spark job but these jobs
>> timings are not coming on driver console like in map and print function as
>>
>>
>> -------------------------------------------
>> Time: 1429054870000 ms
>> -------------------------------------------
>> ------------------------------------------
>> Time: 1429054871000 ms
>> -------------------------------------------
>>
>> ..................
>>
>> Why is it so?
>>
>>
>> Thanks
>> Shushant
>>
>>
>>
>>
>>
>>
>

Re: spark streaming doubt

Posted by Cody Koeninger <co...@koeninger.org>.
Regarding your first question, having more partitions than you do executors
usually means you'll have better utilization, because the workload will be
distributed more evenly.  There's some degree of per-task overhead, but as
long as you don't have a huge imbalance between number of tasks and number
of executors that shouldn't be a large problem.

I don't really understand your second question.

On Sat, Jul 11, 2015 at 5:00 AM, Shushant Arora <sh...@gmail.com>
wrote:

> 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
> partitions in topic. Say I have 300 partitions in topic and 10 executors
> and each with 3 cores so , is it means at a time only 10*3=30 partitions
> are processed and then 30 like that since executors launch tasks per RDD
> partitions , so I need in total; 300 tasks but since I have 30 cores(10
> executors each with 3 cores) so these tasks will execute 30 after 30 till
> 300.
>
> So reducing no of kafka paartitions to say 100 will speed up the
> processing?
>
> 2.In spark streaming job when I processed the kafka stream using foreachRDD
>
> directKafkaStream.foreachRDD(new function( public void call(  vi)){
> v1.foreachPartition(new function(){public void call(){
> //..process partition
> }})
>
> });
>
> since foreachRDD is operation so it spawns spark job but these jobs are
> not coming on driver console like in map and print function as
>
> 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
> partitions in topic. Say I have 300 partitions in topic and 10 executors
> and each with 3 cores so , is it means at a time only 10*3=30 partitions
> are processed and then 30 like that since executors launch tasks per RDD
> partitions , so I need in total; 300 tasks but since I have 30 cores(10
> executors each with 3 cores) so these tasks will execute 30 after 30 till
> 300.
>
> So reducing no of kafka paartitions to say 100 will speed up the
> processing?
>
> 2.In spark streaming job when I processed the kafka stream using foreachRDD
>
> directKafkaStream.foreachRDD(new function( public void call(  vi)){
> v1.foreachPartition(new function(){public void call(){
> //..process partition
> }})
>
> });
>
> since foreachRDD is operation so it spawns spark job but these jobs
> timings are not coming on driver console like in map and print function as
>
>
> -------------------------------------------
> Time: 1429054870000 ms
> -------------------------------------------
> ------------------------------------------
> Time: 1429054871000 ms
> -------------------------------------------
>
> ..................
>
> Why is it so?
>
>
> Thanks
> Shushant
>
>
>
>
>
>