You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Matei Zaharia <ma...@gmail.com> on 2013/10/02 22:00:02 UTC

Re: Some questions about task distribution and execution in Spark

Hi Shangyu,

> (1) When we read in a local file by SparkContext.textFile and do some map/reduce job on it, how will spark decide to send data to which worker node? Will the data be divided/partitioned equally according to the number of worker node and each worker node get one piece of data?

You actually can't run distributed jobs on local files. The local file URL only works on the same machine, or if the file is in a filesystem that's mounted on the same path on all worker nodes.

> (2) If we read in data via HDFS, how will the above process work?

Spark uses the data placement information from HDFS to schedule tasks locally on each block of the file. It creates one task per block of the file by default (which is usually 64-128 MB), though you can ask for more tasks or use RDD.coalesce() to get fewer.

> (3) SparkContext.textFile has a parameter 'minSplits'. Is it used for dividing data of input file into 'minSplits' partitions? Then how do we know each worker node receive how many petitions?

Yup, it's a lower bound on the number of partitions (files with more blocks might get more). The assignment to workers is not static. Spark just gives workers tasks as their previous tasks finish, so that you can get good load balancing even if some workers are faster than others.

> (4) We can set up spark.default.parallelism for system property. Is this parameter applied on each worker node? Say, each worker node have 8 cores, if we set spark.default.parallelism=32, then each core will need to deal with 4 tasks? We can also set up spark_worker_instances in spark-env.sh. For the same worker node, if we set up spark_worker_instances=8, spark_worker_cores=1, spark.default.parallelism=32, then each core will still be sent 4 tasks? Will the performance of the whole system be different in these two situations?

No, this is at the level of entire jobs. You should set it to the number of cores in your cluster, or multiply that by 2-3 to get better load balancing, though often times Spark's default for this works well too.

> (5) Will each map/reduce job be counted as one task? For example, 
> sc.parallelize([0,1,2,3]).map(lambda x: x) Will there be four tasks?

A "task" means a specific thing in Spark, which is one unit of work that happens on one node. See http://spark.incubator.apache.org/docs/latest/cluster-overview.html for an overview of the terminology. The number of tasks depends on the number of partitions (blocks) in the RDD. In this case, sc.parallelize will probably create as many tasks as you have CPU cores, which is the default unless you give it another value. You can view the exact number of tasks on the job monitoring UI in Spark 0.8 (http://spark.incubator.apache.org/docs/latest/monitoring.html).

Matei

> 
> Any help will be appreciated.
> Thanks!
> 
> 
> 
> 
> -- 
> --
> 
> Shangyu, Luo
> Department of Computer Science
> Rice University
> 


Re: Some questions about task distribution and execution in Spark

Posted by Eduardo Berrocal <eb...@hawk.iit.edu>.
The spark code is on my /home directory, which is shared on NFS to all
nodes. So all workers should be able to access the same file.


On Thu, Oct 3, 2013 at 2:34 PM, Mark Hamstra <ma...@clearstorydata.com>wrote:

> But the worker has to be on a node that has local access to the file.
>
>
> On Thu, Oct 3, 2013 at 12:30 PM, Shay Seng <sh...@1618labs.com> wrote:
>
>> Ok, even if my understanding of allowLocal is incorrect, nevertheless
>> (1) I'm loading a local file
>> (2) The tasks seem as if they are getting executed on a slave node
>> (ip-10-129-25-28) is not my master node
>> ??
>>
>>
>>
>>
>> On Thu, Oct 3, 2013 at 12:22 PM, Mark Hamstra <ma...@clearstorydata.com>wrote:
>>
>>> No, that is not what allowLocal means.  For a very few actions, the
>>> DAGScheduler will run the job locally (in a separate thread on the master
>>> node) if the RDD in the action has a single partition and no dependencies
>>> in its lineage.  If allowLocal is false, that doesn't mean that
>>> SparkContext.textFile called on a local file will magically turn that local
>>> file into a distributed file and allow more than just the node where the
>>> file is local to process that file.
>>>
>>>
>>> On Thu, Oct 3, 2013 at 11:05 AM, Shay Seng <sh...@1618labs.com> wrote:
>>>
>>>> Inlined.
>>>>
>>>> On Wed, Oct 2, 2013 at 1:00 PM, Matei Zaharia <ma...@gmail.com>wrote:
>>>>
>>>>> Hi Shangyu,
>>>>>
>>>>> (1) When we read in a local file by SparkContext.textFile and do some
>>>>> map/reduce job on it, how will spark decide to send data to which worker
>>>>> node? Will the data be divided/partitioned equally according to the number
>>>>> of worker node and each worker node get one piece of data?
>>>>>
>>>>> You actually can't run distributed jobs on local files. The local file
>>>>> URL only works on the same machine, or if the file is in a filesystem
>>>>> that's mounted on the same path on all worker nodes.
>>>>>
>>>>> Is this really true?
>>>>
>>>> scala> val f = sc.textFile("/root/ue/ue_env.sh")
>>>> 13/10/03 17:55:45 INFO storage.MemoryStore: ensureFreeSpace(34870)
>>>> called with curMem=34870, maxMem=4081511301
>>>> 13/10/03 17:55:45 INFO storage.MemoryStore: Block broadcast_1 stored as
>>>> values to memory (estimated size 34.1 KB, free 3.8 GB)
>>>> f: spark.RDD[String] = MappedRDD[5] at textFile at <console>:12
>>>>
>>>> scala> f.map(l=>l.split(" ")).collect
>>>> 13/10/03 17:55:51 INFO mapred.FileInputFormat: Total input paths to
>>>> process : 1
>>>> 13/10/03 17:55:51 INFO spark.SparkContext: Starting job: collect at
>>>> <console>:15
>>>> 13/10/03 17:55:51 INFO scheduler.DAGScheduler: Got job 2 (collect at
>>>> <console>:15) with 2 output partitions (*allowLocal=false*)
>>>>  ...
>>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:0 as
>>>> TID 4 on executor 0: ip-10-129-25-28 (preferred)
>>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:0 as
>>>> 1517 bytes in 3 ms
>>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:1 as
>>>> TID 5 on executor 0: ip-10-129-25-28 (preferred)
>>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:1 as
>>>> 1517 bytes in 0 ms
>>>>
>>>> Doesn't allowLocal=false mean the job is getting distributed to workers
>>>> rather than computed locally?
>>>>
>>>>
>>>> tks
>>>>
>>>>
>>>>>
>>>>> Matei
>>>>>
>>>>>
>>>>> Any help will be appreciated.
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> --
>>>>>
>>>>> Shangyu, Luo
>>>>> Department of Computer Science
>>>>> Rice University
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Some questions about task distribution and execution in Spark

Posted by Shay Seng <sh...@1618labs.com>.
Ah, ok. Thanks for the clarification.

When I create a file that is only visible on the master I get the following
error...
f.map(l=>l.split(" ")).collect
13/10/03 20:38:48 INFO util.NativeCodeLoader: Loaded the native-hadoop
library
13/10/03 20:38:48 WARN snappy.LoadSnappy: Snappy native library not loaded
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
file:/root/ue/onlymaster
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)


On Thu, Oct 3, 2013 at 12:34 PM, Mark Hamstra <ma...@clearstorydata.com>wrote:

> But the worker has to be on a node that has local access to the file.
>
>
> On Thu, Oct 3, 2013 at 12:30 PM, Shay Seng <sh...@1618labs.com> wrote:
>
>> Ok, even if my understanding of allowLocal is incorrect, nevertheless
>> (1) I'm loading a local file
>> (2) The tasks seem as if they are getting executed on a slave node
>> (ip-10-129-25-28) is not my master node
>> ??
>>
>>
>>
>>
>> On Thu, Oct 3, 2013 at 12:22 PM, Mark Hamstra <ma...@clearstorydata.com>wrote:
>>
>>> No, that is not what allowLocal means.  For a very few actions, the
>>> DAGScheduler will run the job locally (in a separate thread on the master
>>> node) if the RDD in the action has a single partition and no dependencies
>>> in its lineage.  If allowLocal is false, that doesn't mean that
>>> SparkContext.textFile called on a local file will magically turn that local
>>> file into a distributed file and allow more than just the node where the
>>> file is local to process that file.
>>>
>>>
>>> On Thu, Oct 3, 2013 at 11:05 AM, Shay Seng <sh...@1618labs.com> wrote:
>>>
>>>> Inlined.
>>>>
>>>> On Wed, Oct 2, 2013 at 1:00 PM, Matei Zaharia <ma...@gmail.com>wrote:
>>>>
>>>>> Hi Shangyu,
>>>>>
>>>>> (1) When we read in a local file by SparkContext.textFile and do some
>>>>> map/reduce job on it, how will spark decide to send data to which worker
>>>>> node? Will the data be divided/partitioned equally according to the number
>>>>> of worker node and each worker node get one piece of data?
>>>>>
>>>>> You actually can't run distributed jobs on local files. The local file
>>>>> URL only works on the same machine, or if the file is in a filesystem
>>>>> that's mounted on the same path on all worker nodes.
>>>>>
>>>>> Is this really true?
>>>>
>>>> scala> val f = sc.textFile("/root/ue/ue_env.sh")
>>>> 13/10/03 17:55:45 INFO storage.MemoryStore: ensureFreeSpace(34870)
>>>> called with curMem=34870, maxMem=4081511301
>>>> 13/10/03 17:55:45 INFO storage.MemoryStore: Block broadcast_1 stored as
>>>> values to memory (estimated size 34.1 KB, free 3.8 GB)
>>>> f: spark.RDD[String] = MappedRDD[5] at textFile at <console>:12
>>>>
>>>> scala> f.map(l=>l.split(" ")).collect
>>>> 13/10/03 17:55:51 INFO mapred.FileInputFormat: Total input paths to
>>>> process : 1
>>>> 13/10/03 17:55:51 INFO spark.SparkContext: Starting job: collect at
>>>> <console>:15
>>>> 13/10/03 17:55:51 INFO scheduler.DAGScheduler: Got job 2 (collect at
>>>> <console>:15) with 2 output partitions (*allowLocal=false*)
>>>>  ...
>>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:0 as
>>>> TID 4 on executor 0: ip-10-129-25-28 (preferred)
>>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:0 as
>>>> 1517 bytes in 3 ms
>>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:1 as
>>>> TID 5 on executor 0: ip-10-129-25-28 (preferred)
>>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:1 as
>>>> 1517 bytes in 0 ms
>>>>
>>>> Doesn't allowLocal=false mean the job is getting distributed to workers
>>>> rather than computed locally?
>>>>
>>>>
>>>> tks
>>>>
>>>>
>>>>>
>>>>> Matei
>>>>>
>>>>>
>>>>> Any help will be appreciated.
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> --
>>>>>
>>>>> Shangyu, Luo
>>>>> Department of Computer Science
>>>>> Rice University
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Some questions about task distribution and execution in Spark

Posted by Mark Hamstra <ma...@clearstorydata.com>.
But the worker has to be on a node that has local access to the file.


On Thu, Oct 3, 2013 at 12:30 PM, Shay Seng <sh...@1618labs.com> wrote:

> Ok, even if my understanding of allowLocal is incorrect, nevertheless
> (1) I'm loading a local file
> (2) The tasks seem as if they are getting executed on a slave node
> (ip-10-129-25-28) is not my master node
> ??
>
>
>
>
> On Thu, Oct 3, 2013 at 12:22 PM, Mark Hamstra <ma...@clearstorydata.com>wrote:
>
>> No, that is not what allowLocal means.  For a very few actions, the
>> DAGScheduler will run the job locally (in a separate thread on the master
>> node) if the RDD in the action has a single partition and no dependencies
>> in its lineage.  If allowLocal is false, that doesn't mean that
>> SparkContext.textFile called on a local file will magically turn that local
>> file into a distributed file and allow more than just the node where the
>> file is local to process that file.
>>
>>
>> On Thu, Oct 3, 2013 at 11:05 AM, Shay Seng <sh...@1618labs.com> wrote:
>>
>>> Inlined.
>>>
>>> On Wed, Oct 2, 2013 at 1:00 PM, Matei Zaharia <ma...@gmail.com>wrote:
>>>
>>>> Hi Shangyu,
>>>>
>>>> (1) When we read in a local file by SparkContext.textFile and do some
>>>> map/reduce job on it, how will spark decide to send data to which worker
>>>> node? Will the data be divided/partitioned equally according to the number
>>>> of worker node and each worker node get one piece of data?
>>>>
>>>> You actually can't run distributed jobs on local files. The local file
>>>> URL only works on the same machine, or if the file is in a filesystem
>>>> that's mounted on the same path on all worker nodes.
>>>>
>>>> Is this really true?
>>>
>>> scala> val f = sc.textFile("/root/ue/ue_env.sh")
>>> 13/10/03 17:55:45 INFO storage.MemoryStore: ensureFreeSpace(34870)
>>> called with curMem=34870, maxMem=4081511301
>>> 13/10/03 17:55:45 INFO storage.MemoryStore: Block broadcast_1 stored as
>>> values to memory (estimated size 34.1 KB, free 3.8 GB)
>>> f: spark.RDD[String] = MappedRDD[5] at textFile at <console>:12
>>>
>>> scala> f.map(l=>l.split(" ")).collect
>>> 13/10/03 17:55:51 INFO mapred.FileInputFormat: Total input paths to
>>> process : 1
>>> 13/10/03 17:55:51 INFO spark.SparkContext: Starting job: collect at
>>> <console>:15
>>> 13/10/03 17:55:51 INFO scheduler.DAGScheduler: Got job 2 (collect at
>>> <console>:15) with 2 output partitions (*allowLocal=false*)
>>>  ...
>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:0 as
>>> TID 4 on executor 0: ip-10-129-25-28 (preferred)
>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:0 as
>>> 1517 bytes in 3 ms
>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:1 as
>>> TID 5 on executor 0: ip-10-129-25-28 (preferred)
>>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:1 as
>>> 1517 bytes in 0 ms
>>>
>>> Doesn't allowLocal=false mean the job is getting distributed to workers
>>> rather than computed locally?
>>>
>>>
>>> tks
>>>
>>>
>>>>
>>>> Matei
>>>>
>>>>
>>>> Any help will be appreciated.
>>>> Thanks!
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> --
>>>>
>>>> Shangyu, Luo
>>>> Department of Computer Science
>>>> Rice University
>>>>
>>>>
>>>>
>>>
>>
>

Re: Some questions about task distribution and execution in Spark

Posted by Shay Seng <sh...@1618labs.com>.
Ok, even if my understanding of allowLocal is incorrect, nevertheless
(1) I'm loading a local file
(2) The tasks seem as if they are getting executed on a slave node
(ip-10-129-25-28) is not my master node
??




On Thu, Oct 3, 2013 at 12:22 PM, Mark Hamstra <ma...@clearstorydata.com>wrote:

> No, that is not what allowLocal means.  For a very few actions, the
> DAGScheduler will run the job locally (in a separate thread on the master
> node) if the RDD in the action has a single partition and no dependencies
> in its lineage.  If allowLocal is false, that doesn't mean that
> SparkContext.textFile called on a local file will magically turn that local
> file into a distributed file and allow more than just the node where the
> file is local to process that file.
>
>
> On Thu, Oct 3, 2013 at 11:05 AM, Shay Seng <sh...@1618labs.com> wrote:
>
>> Inlined.
>>
>> On Wed, Oct 2, 2013 at 1:00 PM, Matei Zaharia <ma...@gmail.com>wrote:
>>
>>> Hi Shangyu,
>>>
>>> (1) When we read in a local file by SparkContext.textFile and do some
>>> map/reduce job on it, how will spark decide to send data to which worker
>>> node? Will the data be divided/partitioned equally according to the number
>>> of worker node and each worker node get one piece of data?
>>>
>>> You actually can't run distributed jobs on local files. The local file
>>> URL only works on the same machine, or if the file is in a filesystem
>>> that's mounted on the same path on all worker nodes.
>>>
>>> Is this really true?
>>
>> scala> val f = sc.textFile("/root/ue/ue_env.sh")
>> 13/10/03 17:55:45 INFO storage.MemoryStore: ensureFreeSpace(34870) called
>> with curMem=34870, maxMem=4081511301
>> 13/10/03 17:55:45 INFO storage.MemoryStore: Block broadcast_1 stored as
>> values to memory (estimated size 34.1 KB, free 3.8 GB)
>> f: spark.RDD[String] = MappedRDD[5] at textFile at <console>:12
>>
>> scala> f.map(l=>l.split(" ")).collect
>> 13/10/03 17:55:51 INFO mapred.FileInputFormat: Total input paths to
>> process : 1
>> 13/10/03 17:55:51 INFO spark.SparkContext: Starting job: collect at
>> <console>:15
>> 13/10/03 17:55:51 INFO scheduler.DAGScheduler: Got job 2 (collect at
>> <console>:15) with 2 output partitions (*allowLocal=false*)
>>  ...
>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:0 as TID
>> 4 on executor 0: ip-10-129-25-28 (preferred)
>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:0 as
>> 1517 bytes in 3 ms
>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:1 as TID
>> 5 on executor 0: ip-10-129-25-28 (preferred)
>> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:1 as
>> 1517 bytes in 0 ms
>>
>> Doesn't allowLocal=false mean the job is getting distributed to workers
>> rather than computed locally?
>>
>>
>> tks
>>
>>
>>>
>>> Matei
>>>
>>>
>>> Any help will be appreciated.
>>> Thanks!
>>>
>>>
>>>
>>>
>>> --
>>> --
>>>
>>> Shangyu, Luo
>>> Department of Computer Science
>>> Rice University
>>>
>>>
>>>
>>
>

Re: Some questions about task distribution and execution in Spark

Posted by Mark Hamstra <ma...@clearstorydata.com>.
No, that is not what allowLocal means.  For a very few actions, the
DAGScheduler will run the job locally (in a separate thread on the master
node) if the RDD in the action has a single partition and no dependencies
in its lineage.  If allowLocal is false, that doesn't mean that
SparkContext.textFile called on a local file will magically turn that local
file into a distributed file and allow more than just the node where the
file is local to process that file.


On Thu, Oct 3, 2013 at 11:05 AM, Shay Seng <sh...@1618labs.com> wrote:

> Inlined.
>
> On Wed, Oct 2, 2013 at 1:00 PM, Matei Zaharia <ma...@gmail.com>wrote:
>
>> Hi Shangyu,
>>
>> (1) When we read in a local file by SparkContext.textFile and do some
>> map/reduce job on it, how will spark decide to send data to which worker
>> node? Will the data be divided/partitioned equally according to the number
>> of worker node and each worker node get one piece of data?
>>
>> You actually can't run distributed jobs on local files. The local file
>> URL only works on the same machine, or if the file is in a filesystem
>> that's mounted on the same path on all worker nodes.
>>
>> Is this really true?
>
> scala> val f = sc.textFile("/root/ue/ue_env.sh")
> 13/10/03 17:55:45 INFO storage.MemoryStore: ensureFreeSpace(34870) called
> with curMem=34870, maxMem=4081511301
> 13/10/03 17:55:45 INFO storage.MemoryStore: Block broadcast_1 stored as
> values to memory (estimated size 34.1 KB, free 3.8 GB)
> f: spark.RDD[String] = MappedRDD[5] at textFile at <console>:12
>
> scala> f.map(l=>l.split(" ")).collect
> 13/10/03 17:55:51 INFO mapred.FileInputFormat: Total input paths to
> process : 1
> 13/10/03 17:55:51 INFO spark.SparkContext: Starting job: collect at
> <console>:15
> 13/10/03 17:55:51 INFO scheduler.DAGScheduler: Got job 2 (collect at
> <console>:15) with 2 output partitions (*allowLocal=false*)
>  ...
> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:0 as TID
> 4 on executor 0: ip-10-129-25-28 (preferred)
> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:0 as
> 1517 bytes in 3 ms
> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:1 as TID
> 5 on executor 0: ip-10-129-25-28 (preferred)
> 13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:1 as
> 1517 bytes in 0 ms
>
> Doesn't allowLocal=false mean the job is getting distributed to workers
> rather than computed locally?
>
>
> tks
>
>
>>
>> Matei
>>
>>
>> Any help will be appreciated.
>> Thanks!
>>
>>
>>
>>
>> --
>> --
>>
>> Shangyu, Luo
>> Department of Computer Science
>> Rice University
>>
>>
>>
>

Re: Some questions about task distribution and execution in Spark

Posted by Shay Seng <sh...@1618labs.com>.
Inlined.

On Wed, Oct 2, 2013 at 1:00 PM, Matei Zaharia <ma...@gmail.com>wrote:

> Hi Shangyu,
>
> (1) When we read in a local file by SparkContext.textFile and do some
> map/reduce job on it, how will spark decide to send data to which worker
> node? Will the data be divided/partitioned equally according to the number
> of worker node and each worker node get one piece of data?
>
> You actually can't run distributed jobs on local files. The local file URL
> only works on the same machine, or if the file is in a filesystem that's
> mounted on the same path on all worker nodes.
>
> Is this really true?

scala> val f = sc.textFile("/root/ue/ue_env.sh")
13/10/03 17:55:45 INFO storage.MemoryStore: ensureFreeSpace(34870) called
with curMem=34870, maxMem=4081511301
13/10/03 17:55:45 INFO storage.MemoryStore: Block broadcast_1 stored as
values to memory (estimated size 34.1 KB, free 3.8 GB)
f: spark.RDD[String] = MappedRDD[5] at textFile at <console>:12

scala> f.map(l=>l.split(" ")).collect
13/10/03 17:55:51 INFO mapred.FileInputFormat: Total input paths to process
: 1
13/10/03 17:55:51 INFO spark.SparkContext: Starting job: collect at
<console>:15
13/10/03 17:55:51 INFO scheduler.DAGScheduler: Got job 2 (collect at
<console>:15) with 2 output partitions (*allowLocal=false*)
 ...
13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:0 as TID 4
on executor 0: ip-10-129-25-28 (preferred)
13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:0 as
1517 bytes in 3 ms
13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:1 as TID 5
on executor 0: ip-10-129-25-28 (preferred)
13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:1 as
1517 bytes in 0 ms

Doesn't allowLocal=false mean the job is getting distributed to workers
rather than computed locally?


tks


>
> Matei
>
>
> Any help will be appreciated.
> Thanks!
>
>
>
>
> --
> --
>
> Shangyu, Luo
> Department of Computer Science
> Rice University
>
>
>

Re: Some questions about task distribution and execution in Spark

Posted by Shangyu Luo <ls...@gmail.com>.
Hello Matei,
Thank you very much for your detailed reply!


2013/10/2 Matei Zaharia <ma...@gmail.com>

> Hi Shangyu,
>
> (1) When we read in a local file by SparkContext.textFile and do some
> map/reduce job on it, how will spark decide to send data to which worker
> node? Will the data be divided/partitioned equally according to the number
> of worker node and each worker node get one piece of data?
>
>
> You actually can't run distributed jobs on local files. The local file URL
> only works on the same machine, or if the file is in a filesystem that's
> mounted on the same path on all worker nodes.
>
> (2) If we read in data via HDFS, how will the above process work?
>
>
> Spark uses the data placement information from HDFS to schedule tasks
> locally on each block of the file. It creates one task per block of the
> file by default (which is usually 64-128 MB), though you can ask for more
> tasks or use RDD.coalesce() to get fewer.
>
> (3) SparkContext.textFile has a parameter 'minSplits'. Is it used for
> dividing data of input file into 'minSplits' partitions? Then how do we
> know each worker node receive how many petitions?
>
>
> Yup, it's a lower bound on the number of partitions (files with more
> blocks might get more). The assignment to workers is not static. Spark just
> gives workers tasks as their previous tasks finish, so that you can get
> good load balancing even if some workers are faster than others.
>
> (4) We can set up spark.default.parallelism for system property. Is this
> parameter applied on each worker node? Say, each worker node have 8 cores,
> if we set spark.default.parallelism=32, then each core will need to deal
> with 4 tasks? We can also set up spark_worker_instances in spark-env.sh.
> For the same worker node, if we set up spark_worker_instances=8,
> spark_worker_cores=1, spark.default.parallelism=32, then each core will
> still be sent 4 tasks? Will the performance of the whole system be
> different in these two situations?
>
>
> No, this is at the level of entire jobs. You should set it to the number
> of cores in your cluster, or multiply that by 2-3 to get better load
> balancing, though often times Spark's default for this works well too.
>
> (5) Will each map/reduce job be counted as one task? For example,
> sc.parallelize([0,1,2,3]).map(lambda x: x) Will there be four tasks?
>
>
> A "task" means a specific thing in Spark, which is one unit of work that
> happens on one node. See
> http://spark.incubator.apache.org/docs/latest/cluster-overview.html for
> an overview of the terminology. The number of tasks depends on the number
> of partitions (blocks) in the RDD. In this case, sc.parallelize will
> probably create as many tasks as you have CPU cores, which is the default
> unless you give it another value. You can view the exact number of tasks on
> the job monitoring UI in Spark 0.8 (
> http://spark.incubator.apache.org/docs/latest/monitoring.html).
>
> Matei
>
>
> Any help will be appreciated.
> Thanks!
>
>
>
>
> --
> --
>
> Shangyu, Luo
> Department of Computer Science
> Rice University
>
>
>


-- 
--

Shangyu, Luo
Department of Computer Science
Rice University

--
Not Just Think About It, But Do It!
--
Success is never final.
--
Losers always whine about their best