You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aniruddh Sharma <as...@gmail.com> on 2015/07/08 13:21:21 UTC

Problem in Understanding concept of Physical Cores

Hi

I am new to Spark. Following is the problem that I am facing

Test 1) I ran a VM on CDH distribution with only 1 core allocated to it and
I ran simple Streaming example in spark-shell with sending data on 7777
port and trying to read it. With 1 core allocated to this nothing happens
in my streaming program and it does not receive data. Now I restart VM with
2 cores allocated to it and start spark-shell again and ran Streaming
example again and this time it works

Query a): From this test I concluded that Receiver in Streaming will occupy
the core completely even though I am using very less data and it does not
need complete core for same
but it does not assign this core to Executor for calculating
transformation.  And doing comparison of Partition processing and Receiver
processing is that in case of Partitions same
physical cores can parallelly process multiple partitions but Receiver will
not allow its core to process anything else. Is this understanding correct

Test2) Now I restarted VM with 1 core again and start spark-shell --master
local[2]. I have allocated only 1 core to VM but i say to spark-shell to
use 2 cores. and I test streaming program again and it somehow works.

Query b) Now I am more confused and I dont understand when I have only 1
core for VM. I thought previously it did not work because it had only 1
core and Receiver is completely blocking it and not sharing it with
Executor. But when I do start with local[2] and still having only 1 core to
VM it works. So it means that Receiver and Executor are both getting same
physical CPU. Request you to explain how is it different in this case and
what conclusions shall I draw in context of physical CPU usage.

Thanks and Regards
Aniruddh

Fwd: Problem in Understanding concept of Physical Cores

Posted by Aniruddh Sharma <as...@gmail.com>.
Hi

Apologies for posting queries again. But again posting as they are
unanswered and I am not able to determine differences of parallelism
between Stand Alone on Yarn and their dependency of physical cores. This I
need to understand so that I can decide in which mode we should deploy
Spark.

 If these queries are too naïve then request to provide documentation for
these queries. I will read in details.

Following is the context of queries

a) I need to decide whether to deploy Spark in Standalone mode or in Yarn.
But it seems to me that Spark in Yarn is more parallel than Standalone mode
(given same number of Physical cores) as it is possible to increase
execution threads in Yarn by --executor-cores method
b) Also need to understand following which is not clearly understandable.
Other persons in mailing list are also raising this query in another words
for different cases while doing tuning of jobs. Theoretically a JVM can
support thousands of threads. But in context of Spark what is advisable
usage of ratio of physical cores to ratio of threads to be created to ratio
of partitions to be created. If you find this relevant and important then
might be there could be better link to explain this both in Yarn and Stand
Alone mode.

and Following are queries

Case 1: Standalone Spark--
In standalone mode, as you explained,master in spark-submit local[*]
implicitly, so it uses as creates threads as the number of cores that VM
has, but User can control the number of partitions which needs to be
created and in accordance with number of partitions, tasks will be created.

Query 1: If I have 4 cores, then 4 threads will be created but if I give 40
partitions to my data, than 40 tasks will be created which needs to be
executed on 4 threads. Does it work this way, that 4 threads execute 4
tasks (out of 40 in parallel) and when first set of task gets complete then
they pick next 4 tasks and then they ask execute tasks in sequential
manner. That is 4 tasks concurrent but rest of tasks in sequence when first
concurrent set gets complete.

Query 2: When we pass total-num-cores to Spark in StandAlone mode, then it
seems number of threads do not increase. When I execute
sc.defaultParallelism then it does not seem to take any effect on passed
total-num-cores parameter. So when we use this parameter what does it
exactly mean. Does it control number of threads or does it say to Spark
Master to provide these many number of physical cores to this job. I mean
is this parameter relevant not for a single job but if multiple jobs are
running in cluster than to tell Spark Scheduler not to overallocate
resources to a single job. Also setting this parameter, does it guarantee
any behavior or is it only an indicator for Spark Scheduler.


Case 2: Spark on Yarn
In Spark on Yarn, it seems that threads which get created is not based on
number of physical cores underlying.

Query 3: But it seems to be (defaultMinPartition * executor-cores). Is this
understanding correct. If yes then does it mean Developer has a control on
number of threads to request to Spark by passing executor-core option
(which was not there in Standalone mode as number of threads was based on
number of physical cores). Is there a special reason for this kind of
difference

Query 4: Also it seems there is a restriction on value I can pass in
executor-cores option which seems to be dependent on underlying physical
cores. For example If I have 4 cores and I pass this value to be 20 then it
works, but if I pass this value to be 100 then it does not work. So it
seems actual number of threads which can be created inside JVM are still
limited by number of physical cores but it can be controlled by
executor-cores option. Kindly elaborate what is best practice to request
how many threads based on physical cores and how physical cores limit this
behavior.

Query 5: Is there a reason for difference in behavior of total-num-cores
(does not create a thread ) in Stand Alone mode and exectuor-cores( creates
thread) in Yarn mode in how threads to be created. It seems in Yarn mode we
can create more threads in same Executor JVM compated to Standalone mode
for same number of physical cores.

Thanks and Regards
Aniruddh
---------- Forwarded message ----------
From: Aniruddh Sharma <as...@gmail.com>
Date: Fri, Jul 17, 2015 at 6:05 PM
Subject: Re: Problem in Understanding concept of Physical Cores
To: user <us...@spark.apache.org>


Dear Community

Request to help on below queries they are unanswered.

Thanks and Regards
Aniruddh

On Wed, Jul 15, 2015 at 12:37 PM, Aniruddh Sharma <as...@gmail.com>
wrote:

> Hi TD,
>
> Request your guidance on below 5 queries. Following is the context of them
> that I would use to evaluate based on your response.
>
> a) I need to decide whether to deploy Spark in Standalone mode or in Yarn.
> But it seems to me that Spark in Yarn is more parallel than Standalone mode
> (given same number of Physical cores) as it is possible to increase
> execution threads in Yarn by --executor-cores method
> b) Also need to understand following which is not clearly understandable.
> Other persons in mailing list are also raising this query in another words
> for different cases while doing tuning of jobs. Theoretically a JVM can
> support thousands of threads. But in context of Spark what is advisable
> usage of ratio of physical cores to ratio of threads to be created to ratio
> of partitions to be created. If you find this relevant and important then
> might be there could be better link to explain this both in Yarn and Stand
> Alone mode.
>
> Thanks and Regards
> Aniruddh
>
> On Fri, Jul 10, 2015 at 11:45 AM, Aniruddh Sharma <as...@gmail.com>
> wrote:
>
>> Hi TD,
>>
>> Thanks for elaboration. I have  further doubts based on further test that
>> I did after your guidance
>>
>> Case 1: Standalone Spark--
>> In standalone mode, as you explained,master in spark-submit local[*]
>> implicitly, so it uses as creates threads as the number of cores that VM
>> has, but User can control the number of partitions which needs to be
>> created and in accordance with number of partitions, tasks will be created.
>>
>> Query 1: If I have 4 cores, then 4 threads will be created but if I give
>> 40 partitions to my data, than 40 tasks will be created which needs to be
>> executed on 4 threads. Does it work this way, that 4 threads execute 4
>> tasks (out of 40 in parallel) and when first set of task gets complete then
>> they pick next 4 tasks and then they ask execute tasks in sequential
>> manner. That is 4 tasks concurrent but rest of tasks in sequence when first
>> concurrent set gets complete.
>>
>> Query 2: When we pass total-num-cores to Spark in StandAlone mode, then
>> it seems number of threads do not increase. When I execute
>> sc.defaultParallelism then it does not seem to take any effect on passed
>> total-num-cores parameter. So when we use this parameter what does it
>> exactly mean. Does it control number of threads or does it say to Spark
>> Master to provide these many number of physical cores to this job. I mean
>> is this parameter relevant not for a single job but if multiple jobs are
>> running in cluster than to tell Spark Scheduler not to overallocate
>> resources to a single job. Also setting this parameter, does it guarantee
>> any behavior or is it only an indicator for Spark Scheduler.
>>
>>
>> Case 2: Spark on Yarn
>> In Spark on Yarn, it seems that threads which get created is not based on
>> number of physical cores underlying.
>>
>> Query 3: But it seems to be (defaultMinPartition * executor-cores). Is
>> this understanding correct. If yes then does it mean Developer has a
>> control on number of threads to request to Spark by passing executor-core
>> option (which was not there in Standalone mode as number of threads was
>> based on number of physical cores). Is there a special reason for this kind
>> of difference
>>
>> Query 4: Also it seems there is a restriction on value I can pass in
>> executor-cores option which seems to be dependent on underlying physical
>> cores. For example If I have 4 cores and I pass this value to be 20 then it
>> works, but if I pass this value to be 100 then it does not work. So it
>> seems actual number of threads which can be created inside JVM are still
>> limited by number of physical cores but it can be controlled by
>> executor-cores option. Kindly elaborate what is best practice to request
>> how many threads based on physical cores and how physical cores limit this
>> behavior.
>>
>> Query 5: Is there a reason for difference in behavior of total-num-cores
>> (does not create a thread ) in Stand Alone mode and exectuor-cores( creates
>> thread) in Yarn mode in how threads to be created. It seems in Yarn mode we
>> can create more threads in same Executor JVM compated to Standalone mode
>> for same number of physical cores.
>>
>> Thanks and Regards
>> Aniruddh
>>
>>
>>
>>
>> On Thu, Jul 9, 2015 at 4:30 PM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> Query 1) What spark runs is tasks in task slots, whatever is the mapping
>>> ot tasks to physical cores it does not matter. If there are two task slots
>>> (2 threads in local mode, or an executor with 2 task slots in distributed
>>> mode), it can only run two tasks concurrently. That is true even if the
>>> task is really not doing much. There is no multiplexing going on between
>>> tasks and task slots. So to answer your query 1, there is 1 thread that is
>>> permanently allocated to the receiver task (a long running task) even if it
>>> does not do much. There is no thread left to process the data that is being
>>> received.
>>>
>>> Query 2) I think this is already explained above. The receiver task is
>>> taking the only available slot, leaving nothing for the actual tasks to
>>> execute. This will work fine as long as there is n+1 threads, where n =
>>> number of receivers.
>>>
>>> Query 3) The 2nd thread will be running tasks that process the in-memory
>>> blocks of data generated by the receiver running on the first thread. Now
>>> if the operating system underneath has only one core (physical or virtual),
>>> then those two thread will be multiplexing the resources of that core.
>>>
>>>
>>>
>>> On Thu, Jul 9, 2015 at 1:41 AM, Aniruddh Sharma <as...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for revert.....I still have a confusion. Kindly find my
>>>> understanding
>>>>
>>>> Following is the code
>>>>
>>>> ********************************************************************************
>>>> val ssc = new StreamingContext(sc, Seconds(1))
>>>> val lines = ssc.socketTextStream("localhost", 7777)
>>>> lines.print()
>>>> ssc.start()
>>>>
>>>> ********************************************************************************
>>>>
>>>> Case 1: When I launch VM with only 1 core and start spark-shell without
>>>> any parameter then as per above explanation it uses local[*] implicitly and
>>>> it creates 1 thread as VM has 1 core.
>>>>
>>>> Query 1) But what does it try to execute in that 1 explicit thread ?
>>>> Does Receiver does not get executed or does task does not get executed
>>>> because Receiver is not heavy , i am entering only 1 line so shouldn't same
>>>> physical core be shared with Receiver(internal thread) and thread running
>>>> task ?
>>>> For example-- My VM has 1 physical core and multiple daemons like
>>>> master/worker etc are also working successfully with sharing 1 physical
>>>> core only. Also what I understand is that Executor has a JVM in which
>>>> Receiver is executing as a internal thread and 1 thread (for executing
>>>> task) is created in same JVM but for some reason it does not get CPU.
>>>>
>>>> Query 2) Extending above mentioned analogy to another case, not in
>>>> Spark Streaming, but normal Spark core. If I read input data with 3
>>>> partitions with 1 physical core and do some action on it then also 3 tasks
>>>> should be created and each task should be handled in a separate thread
>>>> inside executor JVM. It also works which means single physical core
>>>> executes 3 different threads executing 3 tasks for 3 partitions. So why
>>>> Streaming case does not get execute.
>>>>
>>>> Case 2: When I launch VM with only 1 core and start spark-shell with
>>>> --master local[2] then as per above explanation it uses local[2] implicitly
>>>> and it creates 2 thread but my VM has still 1 physical core
>>>>
>>>> Query 3) Now when 2 threads are created, but my input data has 1
>>>> partition, so still it requires only 1 task and Receiver is an internal
>>>> thread in Executor JVM. What goes in extra in thread 2 in this case , which
>>>> was not getting executed in above case with 1 thread only. And even if 2
>>>> threads are created , they are still to be executed by same physical core
>>>> so kindly elaborate what is extra processing in extra thread in this case.
>>>>
>>>> Thanks and Regards
>>>> Aniruddh
>>>>
>>>> On Thu, Jul 9, 2015 at 4:43 AM, Tathagata Das <td...@databricks.com>
>>>> wrote:
>>>>
>>>>> There are several levels of indirection going on here, let me clarify.
>>>>>
>>>>> In the local mode, Spark runs tasks (which includes receivers) using
>>>>> the number of threads defined in the master (either local, or local[2], or
>>>>> local[*]).
>>>>> local or local[1] = single thread, so only one task at a time
>>>>> local[2] = 2 threads, so two tasks
>>>>> local[*] = as many threads as the number cores it can detect through
>>>>> the operating system.
>>>>>
>>>>>
>>>>> Test 1: When you dont specify master in spark-submit, it uses local[*]
>>>>> implicitly, so it uses as many threads as the number of cores that VM has.
>>>>> Between 1 and 2 VM cores, the behavior was as expected.
>>>>> Test 2: When you specified master as local[2], it used two threads.
>>>>>
>>>>> HTH
>>>>>
>>>>> TD
>>>>>
>>>>> On Wed, Jul 8, 2015 at 4:21 AM, Aniruddh Sharma <as...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I am new to Spark. Following is the problem that I am facing
>>>>>>
>>>>>> Test 1) I ran a VM on CDH distribution with only 1 core allocated to
>>>>>> it and I ran simple Streaming example in spark-shell with sending data on
>>>>>> 7777 port and trying to read it. With 1 core allocated to this nothing
>>>>>> happens in my streaming program and it does not receive data. Now I restart
>>>>>> VM with 2 cores allocated to it and start spark-shell again and ran
>>>>>> Streaming example again and this time it works
>>>>>>
>>>>>> Query a): From this test I concluded that Receiver in Streaming will
>>>>>> occupy the core completely even though I am using very less data and it
>>>>>> does not need complete core for same
>>>>>> but it does not assign this core to Executor for calculating
>>>>>> transformation.  And doing comparison of Partition processing and Receiver
>>>>>> processing is that in case of Partitions same
>>>>>> physical cores can parallelly process multiple partitions but
>>>>>> Receiver will not allow its core to process anything else. Is this
>>>>>> understanding correct
>>>>>>
>>>>>> Test2) Now I restarted VM with 1 core again and start spark-shell
>>>>>> --master local[2]. I have allocated only 1 core to VM but i say to
>>>>>> spark-shell to use 2 cores. and I test streaming program again and it
>>>>>> somehow works.
>>>>>>
>>>>>> Query b) Now I am more confused and I dont understand when I have
>>>>>> only 1 core for VM. I thought previously it did not work because it had
>>>>>> only 1 core and Receiver is completely blocking it and not sharing it with
>>>>>> Executor. But when I do start with local[2] and still having only 1 core to
>>>>>> VM it works. So it means that Receiver and Executor are both getting same
>>>>>> physical CPU. Request you to explain how is it different in this case
>>>>>> and what conclusions shall I draw in context of physical CPU usage.
>>>>>>
>>>>>> Thanks and Regards
>>>>>> Aniruddh
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problem in Understanding concept of Physical Cores

Posted by Aniruddh Sharma <as...@gmail.com>.
Dear Community

Request to help on below queries they are unanswered.

Thanks and Regards
Aniruddh

On Wed, Jul 15, 2015 at 12:37 PM, Aniruddh Sharma <as...@gmail.com>
wrote:

> Hi TD,
>
> Request your guidance on below 5 queries. Following is the context of them
> that I would use to evaluate based on your response.
>
> a) I need to decide whether to deploy Spark in Standalone mode or in Yarn.
> But it seems to me that Spark in Yarn is more parallel than Standalone mode
> (given same number of Physical cores) as it is possible to increase
> execution threads in Yarn by --executor-cores method
> b) Also need to understand following which is not clearly understandable.
> Other persons in mailing list are also raising this query in another words
> for different cases while doing tuning of jobs. Theoretically a JVM can
> support thousands of threads. But in context of Spark what is advisable
> usage of ratio of physical cores to ratio of threads to be created to ratio
> of partitions to be created. If you find this relevant and important then
> might be there could be better link to explain this both in Yarn and Stand
> Alone mode.
>
> Thanks and Regards
> Aniruddh
>
> On Fri, Jul 10, 2015 at 11:45 AM, Aniruddh Sharma <as...@gmail.com>
> wrote:
>
>> Hi TD,
>>
>> Thanks for elaboration. I have  further doubts based on further test that
>> I did after your guidance
>>
>> Case 1: Standalone Spark--
>> In standalone mode, as you explained,master in spark-submit local[*]
>> implicitly, so it uses as creates threads as the number of cores that VM
>> has, but User can control the number of partitions which needs to be
>> created and in accordance with number of partitions, tasks will be created.
>>
>> Query 1: If I have 4 cores, then 4 threads will be created but if I give
>> 40 partitions to my data, than 40 tasks will be created which needs to be
>> executed on 4 threads. Does it work this way, that 4 threads execute 4
>> tasks (out of 40 in parallel) and when first set of task gets complete then
>> they pick next 4 tasks and then they ask execute tasks in sequential
>> manner. That is 4 tasks concurrent but rest of tasks in sequence when first
>> concurrent set gets complete.
>>
>> Query 2: When we pass total-num-cores to Spark in StandAlone mode, then
>> it seems number of threads do not increase. When I execute
>> sc.defaultParallelism then it does not seem to take any effect on passed
>> total-num-cores parameter. So when we use this parameter what does it
>> exactly mean. Does it control number of threads or does it say to Spark
>> Master to provide these many number of physical cores to this job. I mean
>> is this parameter relevant not for a single job but if multiple jobs are
>> running in cluster than to tell Spark Scheduler not to overallocate
>> resources to a single job. Also setting this parameter, does it guarantee
>> any behavior or is it only an indicator for Spark Scheduler.
>>
>>
>> Case 2: Spark on Yarn
>> In Spark on Yarn, it seems that threads which get created is not based on
>> number of physical cores underlying.
>>
>> Query 3: But it seems to be (defaultMinPartition * executor-cores). Is
>> this understanding correct. If yes then does it mean Developer has a
>> control on number of threads to request to Spark by passing executor-core
>> option (which was not there in Standalone mode as number of threads was
>> based on number of physical cores). Is there a special reason for this kind
>> of difference
>>
>> Query 4: Also it seems there is a restriction on value I can pass in
>> executor-cores option which seems to be dependent on underlying physical
>> cores. For example If I have 4 cores and I pass this value to be 20 then it
>> works, but if I pass this value to be 100 then it does not work. So it
>> seems actual number of threads which can be created inside JVM are still
>> limited by number of physical cores but it can be controlled by
>> executor-cores option. Kindly elaborate what is best practice to request
>> how many threads based on physical cores and how physical cores limit this
>> behavior.
>>
>> Query 5: Is there a reason for difference in behavior of total-num-cores
>> (does not create a thread ) in Stand Alone mode and exectuor-cores( creates
>> thread) in Yarn mode in how threads to be created. It seems in Yarn mode we
>> can create more threads in same Executor JVM compated to Standalone mode
>> for same number of physical cores.
>>
>> Thanks and Regards
>> Aniruddh
>>
>>
>>
>>
>> On Thu, Jul 9, 2015 at 4:30 PM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> Query 1) What spark runs is tasks in task slots, whatever is the mapping
>>> ot tasks to physical cores it does not matter. If there are two task slots
>>> (2 threads in local mode, or an executor with 2 task slots in distributed
>>> mode), it can only run two tasks concurrently. That is true even if the
>>> task is really not doing much. There is no multiplexing going on between
>>> tasks and task slots. So to answer your query 1, there is 1 thread that is
>>> permanently allocated to the receiver task (a long running task) even if it
>>> does not do much. There is no thread left to process the data that is being
>>> received.
>>>
>>> Query 2) I think this is already explained above. The receiver task is
>>> taking the only available slot, leaving nothing for the actual tasks to
>>> execute. This will work fine as long as there is n+1 threads, where n =
>>> number of receivers.
>>>
>>> Query 3) The 2nd thread will be running tasks that process the in-memory
>>> blocks of data generated by the receiver running on the first thread. Now
>>> if the operating system underneath has only one core (physical or virtual),
>>> then those two thread will be multiplexing the resources of that core.
>>>
>>>
>>>
>>> On Thu, Jul 9, 2015 at 1:41 AM, Aniruddh Sharma <as...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for revert.....I still have a confusion. Kindly find my
>>>> understanding
>>>>
>>>> Following is the code
>>>>
>>>> ********************************************************************************
>>>> val ssc = new StreamingContext(sc, Seconds(1))
>>>> val lines = ssc.socketTextStream("localhost", 7777)
>>>> lines.print()
>>>> ssc.start()
>>>>
>>>> ********************************************************************************
>>>>
>>>> Case 1: When I launch VM with only 1 core and start spark-shell without
>>>> any parameter then as per above explanation it uses local[*] implicitly and
>>>> it creates 1 thread as VM has 1 core.
>>>>
>>>> Query 1) But what does it try to execute in that 1 explicit thread ?
>>>> Does Receiver does not get executed or does task does not get executed
>>>> because Receiver is not heavy , i am entering only 1 line so shouldn't same
>>>> physical core be shared with Receiver(internal thread) and thread running
>>>> task ?
>>>> For example-- My VM has 1 physical core and multiple daemons like
>>>> master/worker etc are also working successfully with sharing 1 physical
>>>> core only. Also what I understand is that Executor has a JVM in which
>>>> Receiver is executing as a internal thread and 1 thread (for executing
>>>> task) is created in same JVM but for some reason it does not get CPU.
>>>>
>>>> Query 2) Extending above mentioned analogy to another case, not in
>>>> Spark Streaming, but normal Spark core. If I read input data with 3
>>>> partitions with 1 physical core and do some action on it then also 3 tasks
>>>> should be created and each task should be handled in a separate thread
>>>> inside executor JVM. It also works which means single physical core
>>>> executes 3 different threads executing 3 tasks for 3 partitions. So why
>>>> Streaming case does not get execute.
>>>>
>>>> Case 2: When I launch VM with only 1 core and start spark-shell with
>>>> --master local[2] then as per above explanation it uses local[2] implicitly
>>>> and it creates 2 thread but my VM has still 1 physical core
>>>>
>>>> Query 3) Now when 2 threads are created, but my input data has 1
>>>> partition, so still it requires only 1 task and Receiver is an internal
>>>> thread in Executor JVM. What goes in extra in thread 2 in this case , which
>>>> was not getting executed in above case with 1 thread only. And even if 2
>>>> threads are created , they are still to be executed by same physical core
>>>> so kindly elaborate what is extra processing in extra thread in this case.
>>>>
>>>> Thanks and Regards
>>>> Aniruddh
>>>>
>>>> On Thu, Jul 9, 2015 at 4:43 AM, Tathagata Das <td...@databricks.com>
>>>> wrote:
>>>>
>>>>> There are several levels of indirection going on here, let me clarify.
>>>>>
>>>>> In the local mode, Spark runs tasks (which includes receivers) using
>>>>> the number of threads defined in the master (either local, or local[2], or
>>>>> local[*]).
>>>>> local or local[1] = single thread, so only one task at a time
>>>>> local[2] = 2 threads, so two tasks
>>>>> local[*] = as many threads as the number cores it can detect through
>>>>> the operating system.
>>>>>
>>>>>
>>>>> Test 1: When you dont specify master in spark-submit, it uses local[*]
>>>>> implicitly, so it uses as many threads as the number of cores that VM has.
>>>>> Between 1 and 2 VM cores, the behavior was as expected.
>>>>> Test 2: When you specified master as local[2], it used two threads.
>>>>>
>>>>> HTH
>>>>>
>>>>> TD
>>>>>
>>>>> On Wed, Jul 8, 2015 at 4:21 AM, Aniruddh Sharma <as...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I am new to Spark. Following is the problem that I am facing
>>>>>>
>>>>>> Test 1) I ran a VM on CDH distribution with only 1 core allocated to
>>>>>> it and I ran simple Streaming example in spark-shell with sending data on
>>>>>> 7777 port and trying to read it. With 1 core allocated to this nothing
>>>>>> happens in my streaming program and it does not receive data. Now I restart
>>>>>> VM with 2 cores allocated to it and start spark-shell again and ran
>>>>>> Streaming example again and this time it works
>>>>>>
>>>>>> Query a): From this test I concluded that Receiver in Streaming will
>>>>>> occupy the core completely even though I am using very less data and it
>>>>>> does not need complete core for same
>>>>>> but it does not assign this core to Executor for calculating
>>>>>> transformation.  And doing comparison of Partition processing and Receiver
>>>>>> processing is that in case of Partitions same
>>>>>> physical cores can parallelly process multiple partitions but
>>>>>> Receiver will not allow its core to process anything else. Is this
>>>>>> understanding correct
>>>>>>
>>>>>> Test2) Now I restarted VM with 1 core again and start spark-shell
>>>>>> --master local[2]. I have allocated only 1 core to VM but i say to
>>>>>> spark-shell to use 2 cores. and I test streaming program again and it
>>>>>> somehow works.
>>>>>>
>>>>>> Query b) Now I am more confused and I dont understand when I have
>>>>>> only 1 core for VM. I thought previously it did not work because it had
>>>>>> only 1 core and Receiver is completely blocking it and not sharing it with
>>>>>> Executor. But when I do start with local[2] and still having only 1 core to
>>>>>> VM it works. So it means that Receiver and Executor are both getting same
>>>>>> physical CPU. Request you to explain how is it different in this case
>>>>>> and what conclusions shall I draw in context of physical CPU usage.
>>>>>>
>>>>>> Thanks and Regards
>>>>>> Aniruddh
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problem in Understanding concept of Physical Cores

Posted by Aniruddh Sharma <as...@gmail.com>.
Hi TD,

Request your guidance on below 5 queries. Following is the context of them
that I would use to evaluate based on your response.

a) I need to decide whether to deploy Spark in Standalone mode or in Yarn.
But it seems to me that Spark in Yarn is more parallel than Standalone mode
(given same number of Physical cores) as it is possible to increase
execution threads in Yarn by --executor-cores method
b) Also need to understand following which is not clearly understandable.
Other persons in mailing list are also raising this query in another words
for different cases while doing tuning of jobs. Theoretically a JVM can
support thousands of threads. But in context of Spark what is advisable
usage of ratio of physical cores to ratio of threads to be created to ratio
of partitions to be created. If you find this relevant and important then
might be there could be better link to explain this both in Yarn and Stand
Alone mode.

Thanks and Regards
Aniruddh

On Fri, Jul 10, 2015 at 11:45 AM, Aniruddh Sharma <as...@gmail.com>
wrote:

> Hi TD,
>
> Thanks for elaboration. I have  further doubts based on further test that
> I did after your guidance
>
> Case 1: Standalone Spark--
> In standalone mode, as you explained,master in spark-submit local[*]
> implicitly, so it uses as creates threads as the number of cores that VM
> has, but User can control the number of partitions which needs to be
> created and in accordance with number of partitions, tasks will be created.
>
> Query 1: If I have 4 cores, then 4 threads will be created but if I give
> 40 partitions to my data, than 40 tasks will be created which needs to be
> executed on 4 threads. Does it work this way, that 4 threads execute 4
> tasks (out of 40 in parallel) and when first set of task gets complete then
> they pick next 4 tasks and then they ask execute tasks in sequential
> manner. That is 4 tasks concurrent but rest of tasks in sequence when first
> concurrent set gets complete.
>
> Query 2: When we pass total-num-cores to Spark in StandAlone mode, then it
> seems number of threads do not increase. When I execute
> sc.defaultParallelism then it does not seem to take any effect on passed
> total-num-cores parameter. So when we use this parameter what does it
> exactly mean. Does it control number of threads or does it say to Spark
> Master to provide these many number of physical cores to this job. I mean
> is this parameter relevant not for a single job but if multiple jobs are
> running in cluster than to tell Spark Scheduler not to overallocate
> resources to a single job. Also setting this parameter, does it guarantee
> any behavior or is it only an indicator for Spark Scheduler.
>
>
> Case 2: Spark on Yarn
> In Spark on Yarn, it seems that threads which get created is not based on
> number of physical cores underlying.
>
> Query 3: But it seems to be (defaultMinPartition * executor-cores). Is
> this understanding correct. If yes then does it mean Developer has a
> control on number of threads to request to Spark by passing executor-core
> option (which was not there in Standalone mode as number of threads was
> based on number of physical cores). Is there a special reason for this kind
> of difference
>
> Query 4: Also it seems there is a restriction on value I can pass in
> executor-cores option which seems to be dependent on underlying physical
> cores. For example If I have 4 cores and I pass this value to be 20 then it
> works, but if I pass this value to be 100 then it does not work. So it
> seems actual number of threads which can be created inside JVM are still
> limited by number of physical cores but it can be controlled by
> executor-cores option. Kindly elaborate what is best practice to request
> how many threads based on physical cores and how physical cores limit this
> behavior.
>
> Query 5: Is there a reason for difference in behavior of total-num-cores
> (does not create a thread ) in Stand Alone mode and exectuor-cores( creates
> thread) in Yarn mode in how threads to be created. It seems in Yarn mode we
> can create more threads in same Executor JVM compated to Standalone mode
> for same number of physical cores.
>
> Thanks and Regards
> Aniruddh
>
>
>
>
> On Thu, Jul 9, 2015 at 4:30 PM, Tathagata Das <td...@databricks.com> wrote:
>
>> Query 1) What spark runs is tasks in task slots, whatever is the mapping
>> ot tasks to physical cores it does not matter. If there are two task slots
>> (2 threads in local mode, or an executor with 2 task slots in distributed
>> mode), it can only run two tasks concurrently. That is true even if the
>> task is really not doing much. There is no multiplexing going on between
>> tasks and task slots. So to answer your query 1, there is 1 thread that is
>> permanently allocated to the receiver task (a long running task) even if it
>> does not do much. There is no thread left to process the data that is being
>> received.
>>
>> Query 2) I think this is already explained above. The receiver task is
>> taking the only available slot, leaving nothing for the actual tasks to
>> execute. This will work fine as long as there is n+1 threads, where n =
>> number of receivers.
>>
>> Query 3) The 2nd thread will be running tasks that process the in-memory
>> blocks of data generated by the receiver running on the first thread. Now
>> if the operating system underneath has only one core (physical or virtual),
>> then those two thread will be multiplexing the resources of that core.
>>
>>
>>
>> On Thu, Jul 9, 2015 at 1:41 AM, Aniruddh Sharma <as...@gmail.com>
>> wrote:
>>
>>> Thanks for revert.....I still have a confusion. Kindly find my
>>> understanding
>>>
>>> Following is the code
>>>
>>> ********************************************************************************
>>> val ssc = new StreamingContext(sc, Seconds(1))
>>> val lines = ssc.socketTextStream("localhost", 7777)
>>> lines.print()
>>> ssc.start()
>>>
>>> ********************************************************************************
>>>
>>> Case 1: When I launch VM with only 1 core and start spark-shell without
>>> any parameter then as per above explanation it uses local[*] implicitly and
>>> it creates 1 thread as VM has 1 core.
>>>
>>> Query 1) But what does it try to execute in that 1 explicit thread ?
>>> Does Receiver does not get executed or does task does not get executed
>>> because Receiver is not heavy , i am entering only 1 line so shouldn't same
>>> physical core be shared with Receiver(internal thread) and thread running
>>> task ?
>>> For example-- My VM has 1 physical core and multiple daemons like
>>> master/worker etc are also working successfully with sharing 1 physical
>>> core only. Also what I understand is that Executor has a JVM in which
>>> Receiver is executing as a internal thread and 1 thread (for executing
>>> task) is created in same JVM but for some reason it does not get CPU.
>>>
>>> Query 2) Extending above mentioned analogy to another case, not in Spark
>>> Streaming, but normal Spark core. If I read input data with 3 partitions
>>> with 1 physical core and do some action on it then also 3 tasks should be
>>> created and each task should be handled in a separate thread inside
>>> executor JVM. It also works which means single physical core executes 3
>>> different threads executing 3 tasks for 3 partitions. So why Streaming case
>>> does not get execute.
>>>
>>> Case 2: When I launch VM with only 1 core and start spark-shell with
>>> --master local[2] then as per above explanation it uses local[2] implicitly
>>> and it creates 2 thread but my VM has still 1 physical core
>>>
>>> Query 3) Now when 2 threads are created, but my input data has 1
>>> partition, so still it requires only 1 task and Receiver is an internal
>>> thread in Executor JVM. What goes in extra in thread 2 in this case , which
>>> was not getting executed in above case with 1 thread only. And even if 2
>>> threads are created , they are still to be executed by same physical core
>>> so kindly elaborate what is extra processing in extra thread in this case.
>>>
>>> Thanks and Regards
>>> Aniruddh
>>>
>>> On Thu, Jul 9, 2015 at 4:43 AM, Tathagata Das <td...@databricks.com>
>>> wrote:
>>>
>>>> There are several levels of indirection going on here, let me clarify.
>>>>
>>>> In the local mode, Spark runs tasks (which includes receivers) using
>>>> the number of threads defined in the master (either local, or local[2], or
>>>> local[*]).
>>>> local or local[1] = single thread, so only one task at a time
>>>> local[2] = 2 threads, so two tasks
>>>> local[*] = as many threads as the number cores it can detect through
>>>> the operating system.
>>>>
>>>>
>>>> Test 1: When you dont specify master in spark-submit, it uses local[*]
>>>> implicitly, so it uses as many threads as the number of cores that VM has.
>>>> Between 1 and 2 VM cores, the behavior was as expected.
>>>> Test 2: When you specified master as local[2], it used two threads.
>>>>
>>>> HTH
>>>>
>>>> TD
>>>>
>>>> On Wed, Jul 8, 2015 at 4:21 AM, Aniruddh Sharma <as...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I am new to Spark. Following is the problem that I am facing
>>>>>
>>>>> Test 1) I ran a VM on CDH distribution with only 1 core allocated to
>>>>> it and I ran simple Streaming example in spark-shell with sending data on
>>>>> 7777 port and trying to read it. With 1 core allocated to this nothing
>>>>> happens in my streaming program and it does not receive data. Now I restart
>>>>> VM with 2 cores allocated to it and start spark-shell again and ran
>>>>> Streaming example again and this time it works
>>>>>
>>>>> Query a): From this test I concluded that Receiver in Streaming will
>>>>> occupy the core completely even though I am using very less data and it
>>>>> does not need complete core for same
>>>>> but it does not assign this core to Executor for calculating
>>>>> transformation.  And doing comparison of Partition processing and Receiver
>>>>> processing is that in case of Partitions same
>>>>> physical cores can parallelly process multiple partitions but Receiver
>>>>> will not allow its core to process anything else. Is this understanding
>>>>> correct
>>>>>
>>>>> Test2) Now I restarted VM with 1 core again and start spark-shell
>>>>> --master local[2]. I have allocated only 1 core to VM but i say to
>>>>> spark-shell to use 2 cores. and I test streaming program again and it
>>>>> somehow works.
>>>>>
>>>>> Query b) Now I am more confused and I dont understand when I have only
>>>>> 1 core for VM. I thought previously it did not work because it had only 1
>>>>> core and Receiver is completely blocking it and not sharing it with
>>>>> Executor. But when I do start with local[2] and still having only 1 core to
>>>>> VM it works. So it means that Receiver and Executor are both getting same
>>>>> physical CPU. Request you to explain how is it different in this case
>>>>> and what conclusions shall I draw in context of physical CPU usage.
>>>>>
>>>>> Thanks and Regards
>>>>> Aniruddh
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problem in Understanding concept of Physical Cores

Posted by Aniruddh Sharma <as...@gmail.com>.
Hi TD,

Thanks for elaboration. I have  further doubts based on further test that I
did after your guidance

Case 1: Standalone Spark--
In standalone mode, as you explained,master in spark-submit local[*]
implicitly, so it uses as creates threads as the number of cores that VM
has, but User can control the number of partitions which needs to be
created and in accordance with number of partitions, tasks will be created.

Query 1: If I have 4 cores, then 4 threads will be created but if I give 40
partitions to my data, than 40 tasks will be created which needs to be
executed on 4 threads. Does it work this way, that 4 threads execute 4
tasks (out of 40 in parallel) and when first set of task gets complete then
they pick next 4 tasks and then they ask execute tasks in sequential
manner. That is 4 tasks concurrent but rest of tasks in sequence when first
concurrent set gets complete.

Query 2: When we pass total-num-cores to Spark in StandAlone mode, then it
seems number of threads do not increase. When I execute
sc.defaultParallelism then it does not seem to take any effect on passed
total-num-cores parameter. So when we use this parameter what does it
exactly mean. Does it control number of threads or does it say to Spark
Master to provide these many number of physical cores to this job. I mean
is this parameter relevant not for a single job but if multiple jobs are
running in cluster than to tell Spark Scheduler not to overallocate
resources to a single job. Also setting this parameter, does it guarantee
any behavior or is it only an indicator for Spark Scheduler.


Case 2: Spark on Yarn
In Spark on Yarn, it seems that threads which get created is not based on
number of physical cores underlying.

Query 3: But it seems to be (defaultMinPartition * executor-cores). Is this
understanding correct. If yes then does it mean Developer has a control on
number of threads to request to Spark by passing executor-core option
(which was not there in Standalone mode as number of threads was based on
number of physical cores). Is there a special reason for this kind of
difference

Query 4: Also it seems there is a restriction on value I can pass in
executor-cores option which seems to be dependent on underlying physical
cores. For example If I have 4 cores and I pass this value to be 20 then it
works, but if I pass this value to be 100 then it does not work. So it
seems actual number of threads which can be created inside JVM are still
limited by number of physical cores but it can be controlled by
executor-cores option. Kindly elaborate what is best practice to request
how many threads based on physical cores and how physical cores limit this
behavior.

Query 5: Is there a reason for difference in behavior of total-num-cores
(does not create a thread ) in Stand Alone mode and exectuor-cores( creates
thread) in Yarn mode in how threads to be created. It seems in Yarn mode we
can create more threads in same Executor JVM compated to Standalone mode
for same number of physical cores.

Thanks and Regards
Aniruddh




On Thu, Jul 9, 2015 at 4:30 PM, Tathagata Das <td...@databricks.com> wrote:

> Query 1) What spark runs is tasks in task slots, whatever is the mapping
> ot tasks to physical cores it does not matter. If there are two task slots
> (2 threads in local mode, or an executor with 2 task slots in distributed
> mode), it can only run two tasks concurrently. That is true even if the
> task is really not doing much. There is no multiplexing going on between
> tasks and task slots. So to answer your query 1, there is 1 thread that is
> permanently allocated to the receiver task (a long running task) even if it
> does not do much. There is no thread left to process the data that is being
> received.
>
> Query 2) I think this is already explained above. The receiver task is
> taking the only available slot, leaving nothing for the actual tasks to
> execute. This will work fine as long as there is n+1 threads, where n =
> number of receivers.
>
> Query 3) The 2nd thread will be running tasks that process the in-memory
> blocks of data generated by the receiver running on the first thread. Now
> if the operating system underneath has only one core (physical or virtual),
> then those two thread will be multiplexing the resources of that core.
>
>
>
> On Thu, Jul 9, 2015 at 1:41 AM, Aniruddh Sharma <as...@gmail.com>
> wrote:
>
>> Thanks for revert.....I still have a confusion. Kindly find my
>> understanding
>>
>> Following is the code
>>
>> ********************************************************************************
>> val ssc = new StreamingContext(sc, Seconds(1))
>> val lines = ssc.socketTextStream("localhost", 7777)
>> lines.print()
>> ssc.start()
>>
>> ********************************************************************************
>>
>> Case 1: When I launch VM with only 1 core and start spark-shell without
>> any parameter then as per above explanation it uses local[*] implicitly and
>> it creates 1 thread as VM has 1 core.
>>
>> Query 1) But what does it try to execute in that 1 explicit thread ? Does
>> Receiver does not get executed or does task does not get executed because
>> Receiver is not heavy , i am entering only 1 line so shouldn't same
>> physical core be shared with Receiver(internal thread) and thread running
>> task ?
>> For example-- My VM has 1 physical core and multiple daemons like
>> master/worker etc are also working successfully with sharing 1 physical
>> core only. Also what I understand is that Executor has a JVM in which
>> Receiver is executing as a internal thread and 1 thread (for executing
>> task) is created in same JVM but for some reason it does not get CPU.
>>
>> Query 2) Extending above mentioned analogy to another case, not in Spark
>> Streaming, but normal Spark core. If I read input data with 3 partitions
>> with 1 physical core and do some action on it then also 3 tasks should be
>> created and each task should be handled in a separate thread inside
>> executor JVM. It also works which means single physical core executes 3
>> different threads executing 3 tasks for 3 partitions. So why Streaming case
>> does not get execute.
>>
>> Case 2: When I launch VM with only 1 core and start spark-shell with
>> --master local[2] then as per above explanation it uses local[2] implicitly
>> and it creates 2 thread but my VM has still 1 physical core
>>
>> Query 3) Now when 2 threads are created, but my input data has 1
>> partition, so still it requires only 1 task and Receiver is an internal
>> thread in Executor JVM. What goes in extra in thread 2 in this case , which
>> was not getting executed in above case with 1 thread only. And even if 2
>> threads are created , they are still to be executed by same physical core
>> so kindly elaborate what is extra processing in extra thread in this case.
>>
>> Thanks and Regards
>> Aniruddh
>>
>> On Thu, Jul 9, 2015 at 4:43 AM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> There are several levels of indirection going on here, let me clarify.
>>>
>>> In the local mode, Spark runs tasks (which includes receivers) using the
>>> number of threads defined in the master (either local, or local[2], or
>>> local[*]).
>>> local or local[1] = single thread, so only one task at a time
>>> local[2] = 2 threads, so two tasks
>>> local[*] = as many threads as the number cores it can detect through the
>>> operating system.
>>>
>>>
>>> Test 1: When you dont specify master in spark-submit, it uses local[*]
>>> implicitly, so it uses as many threads as the number of cores that VM has.
>>> Between 1 and 2 VM cores, the behavior was as expected.
>>> Test 2: When you specified master as local[2], it used two threads.
>>>
>>> HTH
>>>
>>> TD
>>>
>>> On Wed, Jul 8, 2015 at 4:21 AM, Aniruddh Sharma <as...@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> I am new to Spark. Following is the problem that I am facing
>>>>
>>>> Test 1) I ran a VM on CDH distribution with only 1 core allocated to it
>>>> and I ran simple Streaming example in spark-shell with sending data on 7777
>>>> port and trying to read it. With 1 core allocated to this nothing happens
>>>> in my streaming program and it does not receive data. Now I restart VM with
>>>> 2 cores allocated to it and start spark-shell again and ran Streaming
>>>> example again and this time it works
>>>>
>>>> Query a): From this test I concluded that Receiver in Streaming will
>>>> occupy the core completely even though I am using very less data and it
>>>> does not need complete core for same
>>>> but it does not assign this core to Executor for calculating
>>>> transformation.  And doing comparison of Partition processing and Receiver
>>>> processing is that in case of Partitions same
>>>> physical cores can parallelly process multiple partitions but Receiver
>>>> will not allow its core to process anything else. Is this understanding
>>>> correct
>>>>
>>>> Test2) Now I restarted VM with 1 core again and start spark-shell
>>>> --master local[2]. I have allocated only 1 core to VM but i say to
>>>> spark-shell to use 2 cores. and I test streaming program again and it
>>>> somehow works.
>>>>
>>>> Query b) Now I am more confused and I dont understand when I have only
>>>> 1 core for VM. I thought previously it did not work because it had only 1
>>>> core and Receiver is completely blocking it and not sharing it with
>>>> Executor. But when I do start with local[2] and still having only 1 core to
>>>> VM it works. So it means that Receiver and Executor are both getting same
>>>> physical CPU. Request you to explain how is it different in this case
>>>> and what conclusions shall I draw in context of physical CPU usage.
>>>>
>>>> Thanks and Regards
>>>> Aniruddh
>>>>
>>>>
>>>
>>
>

Re: Problem in Understanding concept of Physical Cores

Posted by Tathagata Das <td...@databricks.com>.
Query 1) What spark runs is tasks in task slots, whatever is the mapping ot
tasks to physical cores it does not matter. If there are two task slots (2
threads in local mode, or an executor with 2 task slots in distributed
mode), it can only run two tasks concurrently. That is true even if the
task is really not doing much. There is no multiplexing going on between
tasks and task slots. So to answer your query 1, there is 1 thread that is
permanently allocated to the receiver task (a long running task) even if it
does not do much. There is no thread left to process the data that is being
received.

Query 2) I think this is already explained above. The receiver task is
taking the only available slot, leaving nothing for the actual tasks to
execute. This will work fine as long as there is n+1 threads, where n =
number of receivers.

Query 3) The 2nd thread will be running tasks that process the in-memory
blocks of data generated by the receiver running on the first thread. Now
if the operating system underneath has only one core (physical or virtual),
then those two thread will be multiplexing the resources of that core.



On Thu, Jul 9, 2015 at 1:41 AM, Aniruddh Sharma <as...@gmail.com>
wrote:

> Thanks for revert.....I still have a confusion. Kindly find my
> understanding
>
> Following is the code
>
> ********************************************************************************
> val ssc = new StreamingContext(sc, Seconds(1))
> val lines = ssc.socketTextStream("localhost", 7777)
> lines.print()
> ssc.start()
>
> ********************************************************************************
>
> Case 1: When I launch VM with only 1 core and start spark-shell without
> any parameter then as per above explanation it uses local[*] implicitly and
> it creates 1 thread as VM has 1 core.
>
> Query 1) But what does it try to execute in that 1 explicit thread ? Does
> Receiver does not get executed or does task does not get executed because
> Receiver is not heavy , i am entering only 1 line so shouldn't same
> physical core be shared with Receiver(internal thread) and thread running
> task ?
> For example-- My VM has 1 physical core and multiple daemons like
> master/worker etc are also working successfully with sharing 1 physical
> core only. Also what I understand is that Executor has a JVM in which
> Receiver is executing as a internal thread and 1 thread (for executing
> task) is created in same JVM but for some reason it does not get CPU.
>
> Query 2) Extending above mentioned analogy to another case, not in Spark
> Streaming, but normal Spark core. If I read input data with 3 partitions
> with 1 physical core and do some action on it then also 3 tasks should be
> created and each task should be handled in a separate thread inside
> executor JVM. It also works which means single physical core executes 3
> different threads executing 3 tasks for 3 partitions. So why Streaming case
> does not get execute.
>
> Case 2: When I launch VM with only 1 core and start spark-shell with
> --master local[2] then as per above explanation it uses local[2] implicitly
> and it creates 2 thread but my VM has still 1 physical core
>
> Query 3) Now when 2 threads are created, but my input data has 1
> partition, so still it requires only 1 task and Receiver is an internal
> thread in Executor JVM. What goes in extra in thread 2 in this case , which
> was not getting executed in above case with 1 thread only. And even if 2
> threads are created , they are still to be executed by same physical core
> so kindly elaborate what is extra processing in extra thread in this case.
>
> Thanks and Regards
> Aniruddh
>
> On Thu, Jul 9, 2015 at 4:43 AM, Tathagata Das <td...@databricks.com> wrote:
>
>> There are several levels of indirection going on here, let me clarify.
>>
>> In the local mode, Spark runs tasks (which includes receivers) using the
>> number of threads defined in the master (either local, or local[2], or
>> local[*]).
>> local or local[1] = single thread, so only one task at a time
>> local[2] = 2 threads, so two tasks
>> local[*] = as many threads as the number cores it can detect through the
>> operating system.
>>
>>
>> Test 1: When you dont specify master in spark-submit, it uses local[*]
>> implicitly, so it uses as many threads as the number of cores that VM has.
>> Between 1 and 2 VM cores, the behavior was as expected.
>> Test 2: When you specified master as local[2], it used two threads.
>>
>> HTH
>>
>> TD
>>
>> On Wed, Jul 8, 2015 at 4:21 AM, Aniruddh Sharma <as...@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>> I am new to Spark. Following is the problem that I am facing
>>>
>>> Test 1) I ran a VM on CDH distribution with only 1 core allocated to it
>>> and I ran simple Streaming example in spark-shell with sending data on 7777
>>> port and trying to read it. With 1 core allocated to this nothing happens
>>> in my streaming program and it does not receive data. Now I restart VM with
>>> 2 cores allocated to it and start spark-shell again and ran Streaming
>>> example again and this time it works
>>>
>>> Query a): From this test I concluded that Receiver in Streaming will
>>> occupy the core completely even though I am using very less data and it
>>> does not need complete core for same
>>> but it does not assign this core to Executor for calculating
>>> transformation.  And doing comparison of Partition processing and Receiver
>>> processing is that in case of Partitions same
>>> physical cores can parallelly process multiple partitions but Receiver
>>> will not allow its core to process anything else. Is this understanding
>>> correct
>>>
>>> Test2) Now I restarted VM with 1 core again and start spark-shell
>>> --master local[2]. I have allocated only 1 core to VM but i say to
>>> spark-shell to use 2 cores. and I test streaming program again and it
>>> somehow works.
>>>
>>> Query b) Now I am more confused and I dont understand when I have only 1
>>> core for VM. I thought previously it did not work because it had only 1
>>> core and Receiver is completely blocking it and not sharing it with
>>> Executor. But when I do start with local[2] and still having only 1 core to
>>> VM it works. So it means that Receiver and Executor are both getting same
>>> physical CPU. Request you to explain how is it different in this case
>>> and what conclusions shall I draw in context of physical CPU usage.
>>>
>>> Thanks and Regards
>>> Aniruddh
>>>
>>>
>>
>

Re: Problem in Understanding concept of Physical Cores

Posted by Aniruddh Sharma <as...@gmail.com>.
Thanks for revert.....I still have a confusion. Kindly find my understanding

Following is the code
********************************************************************************
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", 7777)
lines.print()
ssc.start()
********************************************************************************

Case 1: When I launch VM with only 1 core and start spark-shell without any
parameter then as per above explanation it uses local[*] implicitly and it
creates 1 thread as VM has 1 core.

Query 1) But what does it try to execute in that 1 explicit thread ? Does
Receiver does not get executed or does task does not get executed because
Receiver is not heavy , i am entering only 1 line so shouldn't same
physical core be shared with Receiver(internal thread) and thread running
task ?
For example-- My VM has 1 physical core and multiple daemons like
master/worker etc are also working successfully with sharing 1 physical
core only. Also what I understand is that Executor has a JVM in which
Receiver is executing as a internal thread and 1 thread (for executing
task) is created in same JVM but for some reason it does not get CPU.

Query 2) Extending above mentioned analogy to another case, not in Spark
Streaming, but normal Spark core. If I read input data with 3 partitions
with 1 physical core and do some action on it then also 3 tasks should be
created and each task should be handled in a separate thread inside
executor JVM. It also works which means single physical core executes 3
different threads executing 3 tasks for 3 partitions. So why Streaming case
does not get execute.

Case 2: When I launch VM with only 1 core and start spark-shell with
--master local[2] then as per above explanation it uses local[2] implicitly
and it creates 2 thread but my VM has still 1 physical core

Query 3) Now when 2 threads are created, but my input data has 1 partition,
so still it requires only 1 task and Receiver is an internal thread in
Executor JVM. What goes in extra in thread 2 in this case , which was not
getting executed in above case with 1 thread only. And even if 2 threads
are created , they are still to be executed by same physical core so kindly
elaborate what is extra processing in extra thread in this case.

Thanks and Regards
Aniruddh

On Thu, Jul 9, 2015 at 4:43 AM, Tathagata Das <td...@databricks.com> wrote:

> There are several levels of indirection going on here, let me clarify.
>
> In the local mode, Spark runs tasks (which includes receivers) using the
> number of threads defined in the master (either local, or local[2], or
> local[*]).
> local or local[1] = single thread, so only one task at a time
> local[2] = 2 threads, so two tasks
> local[*] = as many threads as the number cores it can detect through the
> operating system.
>
>
> Test 1: When you dont specify master in spark-submit, it uses local[*]
> implicitly, so it uses as many threads as the number of cores that VM has.
> Between 1 and 2 VM cores, the behavior was as expected.
> Test 2: When you specified master as local[2], it used two threads.
>
> HTH
>
> TD
>
> On Wed, Jul 8, 2015 at 4:21 AM, Aniruddh Sharma <as...@gmail.com>
> wrote:
>
>> Hi
>>
>> I am new to Spark. Following is the problem that I am facing
>>
>> Test 1) I ran a VM on CDH distribution with only 1 core allocated to it
>> and I ran simple Streaming example in spark-shell with sending data on 7777
>> port and trying to read it. With 1 core allocated to this nothing happens
>> in my streaming program and it does not receive data. Now I restart VM with
>> 2 cores allocated to it and start spark-shell again and ran Streaming
>> example again and this time it works
>>
>> Query a): From this test I concluded that Receiver in Streaming will
>> occupy the core completely even though I am using very less data and it
>> does not need complete core for same
>> but it does not assign this core to Executor for calculating
>> transformation.  And doing comparison of Partition processing and Receiver
>> processing is that in case of Partitions same
>> physical cores can parallelly process multiple partitions but Receiver
>> will not allow its core to process anything else. Is this understanding
>> correct
>>
>> Test2) Now I restarted VM with 1 core again and start spark-shell
>> --master local[2]. I have allocated only 1 core to VM but i say to
>> spark-shell to use 2 cores. and I test streaming program again and it
>> somehow works.
>>
>> Query b) Now I am more confused and I dont understand when I have only 1
>> core for VM. I thought previously it did not work because it had only 1
>> core and Receiver is completely blocking it and not sharing it with
>> Executor. But when I do start with local[2] and still having only 1 core to
>> VM it works. So it means that Receiver and Executor are both getting same
>> physical CPU. Request you to explain how is it different in this case and
>> what conclusions shall I draw in context of physical CPU usage.
>>
>> Thanks and Regards
>> Aniruddh
>>
>>
>

Re: Problem in Understanding concept of Physical Cores

Posted by Tathagata Das <td...@databricks.com>.
There are several levels of indirection going on here, let me clarify.

In the local mode, Spark runs tasks (which includes receivers) using the
number of threads defined in the master (either local, or local[2], or
local[*]).
local or local[1] = single thread, so only one task at a time
local[2] = 2 threads, so two tasks
local[*] = as many threads as the number cores it can detect through the
operating system.


Test 1: When you dont specify master in spark-submit, it uses local[*]
implicitly, so it uses as many threads as the number of cores that VM has.
Between 1 and 2 VM cores, the behavior was as expected.
Test 2: When you specified master as local[2], it used two threads.

HTH

TD

On Wed, Jul 8, 2015 at 4:21 AM, Aniruddh Sharma <as...@gmail.com>
wrote:

> Hi
>
> I am new to Spark. Following is the problem that I am facing
>
> Test 1) I ran a VM on CDH distribution with only 1 core allocated to it
> and I ran simple Streaming example in spark-shell with sending data on 7777
> port and trying to read it. With 1 core allocated to this nothing happens
> in my streaming program and it does not receive data. Now I restart VM with
> 2 cores allocated to it and start spark-shell again and ran Streaming
> example again and this time it works
>
> Query a): From this test I concluded that Receiver in Streaming will
> occupy the core completely even though I am using very less data and it
> does not need complete core for same
> but it does not assign this core to Executor for calculating
> transformation.  And doing comparison of Partition processing and Receiver
> processing is that in case of Partitions same
> physical cores can parallelly process multiple partitions but Receiver
> will not allow its core to process anything else. Is this understanding
> correct
>
> Test2) Now I restarted VM with 1 core again and start spark-shell --master
> local[2]. I have allocated only 1 core to VM but i say to spark-shell to
> use 2 cores. and I test streaming program again and it somehow works.
>
> Query b) Now I am more confused and I dont understand when I have only 1
> core for VM. I thought previously it did not work because it had only 1
> core and Receiver is completely blocking it and not sharing it with
> Executor. But when I do start with local[2] and still having only 1 core to
> VM it works. So it means that Receiver and Executor are both getting same
> physical CPU. Request you to explain how is it different in this case and
> what conclusions shall I draw in context of physical CPU usage.
>
> Thanks and Regards
> Aniruddh
>
>