You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jacek Laskowski <ja...@japila.pl> on 2017/12/16 11:40:03 UTC

How to...UNION ALL of two SELECTs over different data sources in parallel?

Hi,

I've been trying to find out the answer to the question about UNION ALL and
SELECTs @ https://stackoverflow.com/q/47837955/1305344

> If I have Spark SQL statement of the form SELECT [...] UNION ALL SELECT
[...], will the two SELECT statements be executed in parallel? In my
specific use case the two SELECTs are querying two different database
tables. In contrast to what I would have expected, the Spark UI seems to
suggest that the two SELECT statements are performed sequentially.

How to know if the two separate SELECTs are executed in parallel or not?
What are the tools to know it?

My answer was to use explain operator that would show...well...physical
plan, but am not sure how to read it to know whether a query plan is going
to be executed in parallel or not.

I then used the underlying RDD lineage (using rdd.toDebugString) hoping
that gives me the answer, but...I'm not so sure.

For a query like the following:

val q = spark.range(1).union(spark.range(2))

I thought that since both SELECTs are codegen'ed they could be executed in
parallel, but when switched to the RDD lineage I lost my confidence given
there's just one single stage (!)

scala> q.rdd.toDebugString
res4: String =
(16) MapPartitionsRDD[17] at rdd at <console>:26 []
 |   MapPartitionsRDD[16] at rdd at <console>:26 []
 |   UnionRDD[15] at rdd at <console>:26 []
 |   MapPartitionsRDD[11] at rdd at <console>:26 []
 |   MapPartitionsRDD[10] at rdd at <console>:26 []
 |   ParallelCollectionRDD[9] at rdd at <console>:26 []
 |   MapPartitionsRDD[14] at rdd at <console>:26 []
 |   MapPartitionsRDD[13] at rdd at <console>:26 []
 |   ParallelCollectionRDD[12] at rdd at <console>:26 []

What am I missing and how to be certain whether and what parts of a query
are going to be executed in parallel?

Please help...

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

Re: How to...UNION ALL of two SELECTs over different data sources in parallel?

Posted by Jacek Laskowski <ja...@japila.pl>.
Thanks Silvio!

In the meantime, with help of Adam and code review of WholeStageCodegenExec
and CollapseCodegenStages, I found out that anything that's codegend is as
fast as the tasks in a stage. In this case, union of two codegend subtrees
is indeed parallel.

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Sat, Dec 16, 2017 at 7:12 PM, Silvio Fiorito <
silvio.fiorito@granturing.com> wrote:

> Hi Jacek,
>
>
>
> Just replied to the SO thread as well, but…
>
>
>
> Yes, your first statement is correct. The DFs in the union are read in the
> same stage, so in your example where each DF has 8 partitions then you have
> a stage with 16 tasks to read the 2 DFs. There's no need to define the DF
> in a separate thread. You can verify this also in the Stage UI and looking
> at the Event Timeline. You should see the tasks across the DFs executing in
> parallel as expected.
>
>
>
> Here’s the UI for the following example, in which case each DF only has 1
> partition (so we get a stage with 2 tasks):
>
>
>
> spark.range(1, 100, 1, 1).write.save("/tmp/df1")
>
> spark.range(101, 200, 1, 1).write.save("/tmp/df2")
>
>
>
> spark.read.load("/tmp/df1").union(spark.read.load("/tmp/df2")).foreach {
> _ => }
>
>
>
>
>
> *From: *Jacek Laskowski <ja...@japila.pl>
> *Date: *Saturday, December 16, 2017 at 6:40 AM
> *To: *"user @spark" <us...@spark.apache.org>
> *Subject: *How to...UNION ALL of two SELECTs over different data sources
> in parallel?
>
>
>
> Hi,
>
>
>
> I've been trying to find out the answer to the question about UNION ALL
> and SELECTs @ https://stackoverflow.com/q/47837955/1305344
>
>
>
> > If I have Spark SQL statement of the form SELECT [...] UNION ALL SELECT
> [...], will the two SELECT statements be executed in parallel? In my
> specific use case the two SELECTs are querying two different database
> tables. In contrast to what I would have expected, the Spark UI seems to
> suggest that the two SELECT statements are performed sequentially.
>
>
>
> How to know if the two separate SELECTs are executed in parallel or not?
> What are the tools to know it?
>
>
>
> My answer was to use explain operator that would show...well...physical
> plan, but am not sure how to read it to know whether a query plan is going
> to be executed in parallel or not.
>
>
>
> I then used the underlying RDD lineage (using rdd.toDebugString) hoping
> that gives me the answer, but...I'm not so sure.
>
>
>
> For a query like the following:
>
>
>
> val q = spark.range(1).union(spark.range(2))
>
>
>
> I thought that since both SELECTs are codegen'ed they could be executed in
> parallel, but when switched to the RDD lineage I lost my confidence given
> there's just one single stage (!)
>
>
>
> scala> q.rdd.toDebugString
>
> res4: String =
>
> (16) MapPartitionsRDD[17] at rdd at <console>:26 []
>
>  |   MapPartitionsRDD[16] at rdd at <console>:26 []
>
>  |   UnionRDD[15] at rdd at <console>:26 []
>
>  |   MapPartitionsRDD[11] at rdd at <console>:26 []
>
>  |   MapPartitionsRDD[10] at rdd at <console>:26 []
>
>  |   ParallelCollectionRDD[9] at rdd at <console>:26 []
>
>  |   MapPartitionsRDD[14] at rdd at <console>:26 []
>
>  |   MapPartitionsRDD[13] at rdd at <console>:26 []
>
>  |   ParallelCollectionRDD[12] at rdd at <console>:26 []
>
>
>
> What am I missing and how to be certain whether and what parts of a query
> are going to be executed in parallel?
>
>
>
> Please help...
>
>
>
> Pozdrawiam,
>
> Jacek Laskowski
>
> ----
>
> https://about.me/JacekLaskowski
>
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>
> Follow me at https://twitter.com/jaceklaskowski
>

Re: How to...UNION ALL of two SELECTs over different data sources in parallel?

Posted by Silvio Fiorito <si...@granturing.com>.
Hi Jacek,

Just replied to the SO thread as well, but…

Yes, your first statement is correct. The DFs in the union are read in the same stage, so in your example where each DF has 8 partitions then you have a stage with 16 tasks to read the 2 DFs. There's no need to define the DF in a separate thread. You can verify this also in the Stage UI and looking at the Event Timeline. You should see the tasks across the DFs executing in parallel as expected.

Here’s the UI for the following example, in which case each DF only has 1 partition (so we get a stage with 2 tasks):

spark.range(1, 100, 1, 1).write.save("/tmp/df1")
spark.range(101, 200, 1, 1).write.save("/tmp/df2")

spark.read.load("/tmp/df1").union(spark.read.load("/tmp/df2")).foreach { _ => }

[cid:image001.png@01D3766F.77848FA0]

From: Jacek Laskowski <ja...@japila.pl>
Date: Saturday, December 16, 2017 at 6:40 AM
To: "user @spark" <us...@spark.apache.org>
Subject: How to...UNION ALL of two SELECTs over different data sources in parallel?

Hi,

I've been trying to find out the answer to the question about UNION ALL and SELECTs @ https://stackoverflow.com/q/47837955/1305344

> If I have Spark SQL statement of the form SELECT [...] UNION ALL SELECT [...], will the two SELECT statements be executed in parallel? In my specific use case the two SELECTs are querying two different database tables. In contrast to what I would have expected, the Spark UI seems to suggest that the two SELECT statements are performed sequentially.

How to know if the two separate SELECTs are executed in parallel or not? What are the tools to know it?

My answer was to use explain operator that would show...well...physical plan, but am not sure how to read it to know whether a query plan is going to be executed in parallel or not.

I then used the underlying RDD lineage (using rdd.toDebugString) hoping that gives me the answer, but...I'm not so sure.

For a query like the following:

val q = spark.range(1).union(spark.range(2))

I thought that since both SELECTs are codegen'ed they could be executed in parallel, but when switched to the RDD lineage I lost my confidence given there's just one single stage (!)

scala> q.rdd.toDebugString
res4: String =
(16) MapPartitionsRDD[17] at rdd at <console>:26 []
 |   MapPartitionsRDD[16] at rdd at <console>:26 []
 |   UnionRDD[15] at rdd at <console>:26 []
 |   MapPartitionsRDD[11] at rdd at <console>:26 []
 |   MapPartitionsRDD[10] at rdd at <console>:26 []
 |   ParallelCollectionRDD[9] at rdd at <console>:26 []
 |   MapPartitionsRDD[14] at rdd at <console>:26 []
 |   MapPartitionsRDD[13] at rdd at <console>:26 []
 |   ParallelCollectionRDD[12] at rdd at <console>:26 []

What am I missing and how to be certain whether and what parts of a query are going to be executed in parallel?

Please help...

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski