You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by gsvic <vi...@gmail.com> on 2015/09/17 14:23:05 UTC

RDD: Execution and Scheduling

After reading some parts of Spark source code I would like to make some
questions about RDD execution and scheduling.

At first, please correct me if I am wrong at the following:
1) The number of partitions equals to the number of tasks will be executed
in parallel (e.g. , when an RDD is repartitioned in 30 partitions, a count
aggregate will be executed in 30 tasks distributed in the cluster)

2) A  task
<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/Task.scala>  
concerns only one partition (partitionId: Int) and this partition maps to an
RDD block.

3) If and RDD is cached, then the preferred location for execution of this
Partition and the corresponding RDD block will be the node the data is
cached in.

The questions are the following:

I run some SQL aggregate functions on a TPCH dataset. The cluster is
consisted of 7 executors (and one driver) each one contains 8 GB RAM and 4
VCPUs. The dataset is in Parquet file format in an external Hadoop Cluster,
that is, Spark workers and Hadoop DataNodes are running on different VMs.

1) For a count aggregate, I repartitioned the DataFrame into 24 partitions
and each executor took 2 partitions(tasks) for execution. Is that always
happens the same way (the number of tasks per node is equal to
#Partitions/#Workers) ?

2) How Spark chooses the executor for each task if the data is not cached?
It's clear what happens if the data is cached in  DAGScheduler.scala
<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1541> 
, but what if is not? Is it possible to determine that before execution?

3) In the case of an SQL Join operation, is it possible to determine how
many tasks/partitions will be generated and in which worker each task be
submitted?



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Execution-and-Scheduling-tp14177.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: RDD: Execution and Scheduling

Posted by gsvic <vi...@gmail.com>.
I already have but I needed some clarifications. Thanks for all your help!



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Execution-and-Scheduling-tp14177p14286.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: RDD: Execution and Scheduling

Posted by Reynold Xin <rx...@databricks.com>.
On Sun, Sep 20, 2015 at 3:58 PM, gsvic <vi...@gmail.com> wrote:

> Concerning answers 1 and 2:
>
> 1) How Spark determines a node as a "slow node" and how slow is that?
>

There are two cases here:

1. If a node is busy (e.g. all slots are already occupied), the scheduler
cannot schedule anything on it. See "Delay Scheduling: A Simple Technique
for Achieving
Locality and Fairness in Cluster Scheduling" paper for how locality
scheduling is done.

2. Within the same stage, if a task is slower than other tasks, a copy of
it can be launched speculatively in order to mitigate stragglers. Search
for speculation in the code base to find out more.



> 2) How an RDD chooses a location as a preferred location and with which
> criteria?
>

This is part of the RDD definition. The RDD interface itself defines
locality. The Spark NSDI paper already talks about this.

Why don't you just do a little bit of code reading yourself?



>
> Could you please also include the links of the source files for the two
> questions above?
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Execution-and-Scheduling-tp14177p14226.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>

Re: RDD: Execution and Scheduling

Posted by gsvic <vi...@gmail.com>.
Concerning answers 1 and 2: 

1) How Spark determines a node as a "slow node" and how slow is that? 

2) How an RDD chooses a location as a preferred location and with which
criteria? 

Could you please also include the links of the source files for the two
questions above?



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Execution-and-Scheduling-tp14177p14226.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: RDD: Execution and Scheduling

Posted by gsvic <vi...@gmail.com>.
Concerning answers 1 and 2:

1) How Spark determines a node as a "slow node" and how slow is that?

2) How an RDD choose a location as a preferred location and with which
criteria?

Could you please also include the links of the source files for the two
questions above?



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Execution-and-Scheduling-tp14177p14186.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: RDD: Execution and Scheduling

Posted by Reynold Xin <rx...@databricks.com>.
Your understanding is mostly correct. Replies inline.

On Thu, Sep 17, 2015 at 5:23 AM, gsvic <vi...@gmail.com> wrote:

> After reading some parts of Spark source code I would like to make some
> questions about RDD execution and scheduling.
>
> At first, please correct me if I am wrong at the following:
> 1) The number of partitions equals to the number of tasks will be executed
> in parallel (e.g. , when an RDD is repartitioned in 30 partitions, a count
> aggregate will be executed in 30 tasks distributed in the cluster)
>
> 2) A  task
> <
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/Task.scala
> >
> concerns only one partition (partitionId: Int) and this partition maps to
> an
> RDD block.
>
> 3) If and RDD is cached, then the preferred location for execution of this
> Partition and the corresponding RDD block will be the node the data is
> cached in.
>
> The questions are the following:
>
> I run some SQL aggregate functions on a TPCH dataset. The cluster is
> consisted of 7 executors (and one driver) each one contains 8 GB RAM and 4
> VCPUs. The dataset is in Parquet file format in an external Hadoop Cluster,
> that is, Spark workers and Hadoop DataNodes are running on different VMs.
>
> 1) For a count aggregate, I repartitioned the DataFrame into 24 partitions
> and each executor took 2 partitions(tasks) for execution. Is that always
> happens the same way (the number of tasks per node is equal to
> #Partitions/#Workers) ?
>

No that is not always true. If a node is slower than others, less tasks
will get scheduled there. Or if a node is busy running some other thing,
maybe no tasks will be scheduled there.


>
> 2) How Spark chooses the executor for each task if the data is not cached?
> It's clear what happens if the data is cached in  DAGScheduler.scala
> <
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1541
> >
> , but what if is not? Is it possible to determine that before execution?
>

RDD itself still has preferred locations.


>
> 3) In the case of an SQL Join operation, is it possible to determine how
> many tasks/partitions will be generated and in which worker each task be
> submitted?


Not sure what you mean - right now for shuffle join it is hard coded by to
200 partitions, and the scheduler randomly chooses the executors to do
joins.

For broadcast join, there is no shuffle. Tasks are scheduled based on the
locality of the large fact table.