You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Ulanov, Alexander" <al...@hp.com> on 2015/06/24 20:35:57 UTC

Force inner join to shuffle the smallest table

Hi,

I try to inner join of two tables on two fields(string and double). One table is 2B rows, the second is 500K. They are stored in HDFS in Parquet. Spark v 1.4.
val big = sqlContext.paquetFile("hdfs://big")
data.registerTempTable("big")
val small = sqlContext.paquetFile("hdfs://small")
data.registerTempTable("small")
val result = sqlContext.sql("select big.f1, big.f2 from big inner join small on big.s=small.s and big.d=small.d")

This query fails in the middle due to one of the workers "disk out of space" with shuffle reported 1.8TB which is the maximum size of my spark working dirs (on total 7 worker nodes). This is surprising, because the "big" table takes 2TB disk space (unreplicated) and "small" about 5GB and I would expect that optimizer will shuffle the small table. How to force Spark to shuffle the small table? I tried to write "small inner join big" however it also fails with 1.8TB of shuffle.

Best regards, Alexander


RE: Force inner join to shuffle the smallest table

Posted by "Ulanov, Alexander" <al...@hp.com>.
The problem is that it shuffles the wrong table which even compressed won’t fit to my disk.

Actually, I found the source of the problem, although I could not reproduce it with synthetic data (but remains true for my original data: big table 2B rows, small 500K):

When I do join on two fields like this “select big.f1, big.f2 from small inner join big on big.s=small.s and big.d=small.d” then the query planner does ShuffledHashJoin with the bigger table and the query fails due to Shuffle write of the whole big table (out of space):
== Physical Plan ==
Project [bedId#28,sensorId#30L,time#31,label#32,fft#34]
ShuffledHashJoin [time#38,bedId#35], [time#31,bedId#28], BuildLeft
  Exchange (HashPartitioning [time#38,bedId#35], 2000), []
    PhysicalRDD [bedId#35,time#38], MapPartitionsRDD[75] at explain at <console>:25
  Exchange (HashPartitioning [time#31,bedId#28], 2000), []
   PhysicalRDD [bedId#28,sensorId#30L,fft#34,label#32,time#31], MapPartitionsRDD[77] at explain at <console>:25

When I do join on one field like this “select big.f1, big.f2 from small inner join big on big.s=small.s” then the query planner does BroadcastHashJoin which writes just what’s needed and therefore executes without problems:

== Physical Plan ==
Project [bedId#28,sensorId#30L,time#31,label#32,fft#34]
BroadcastHashJoin [time#38], [time#31], BuildLeft
  Limit 498340
   PhysicalRDD [time#38], MapPartitionsRDD[66] at explain at <console>:25
  PhysicalRDD [bedId#28,sensorId#30L,fft#34,label#32,time#31], MapPartitionsRDD[68] at explain at <console>:25

Could Spark SQL developers suggest why it happens?

Best regards, Alexander


From: Stephen Carman [mailto:scarman@coldlight.com]
Sent: Wednesday, June 24, 2015 12:33 PM
To: Ulanov, Alexander
Cc: CC GP; dev@spark.apache.org
Subject: Re: Force inner join to shuffle the smallest table

Have you tried shuffle compression?

spark.shuffle.compress (true|false)

if you have a filesystem capable also I’ve noticed file consolidation helps disk usage a bit.

spark.shuffle.consolidateFiles (true|false)

Steve

On Jun 24, 2015, at 3:27 PM, Ulanov, Alexander <al...@hp.com>> wrote:

It also fails, as I mentioned in the original question.

From: CC GP [mailto:chandrika.gopalakrishna@gmail.com]
Sent: Wednesday, June 24, 2015 12:08 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Force inner join to shuffle the smallest table

Try below and see if it makes a difference:

val result = sqlContext.sql(“select big.f1, big.f2 from small inner join big on big.s=small.s and big.d=small.d”)

On Wed, Jun 24, 2015 at 11:35 AM, Ulanov, Alexander <al...@hp.com>> wrote:
Hi,

I try to inner join of two tables on two fields(string and double). One table is 2B rows, the second is 500K. They are stored in HDFS in Parquet. Spark v 1.4.
val big = sqlContext.paquetFile(“hdfs://big”)
data.registerTempTable(“big”)
val small = sqlContext.paquetFile(“hdfs://small”)
data.registerTempTable(“small”)
val result = sqlContext.sql(“select big.f1, big.f2 from big inner join small on big.s=small.s and big.d=small.d”)

This query fails in the middle due to one of the workers “disk out of space” with shuffle reported 1.8TB which is the maximum size of my spark working dirs (on total 7 worker nodes). This is surprising, because the “big” table takes 2TB disk space (unreplicated) and “small” about 5GB and I would expect that optimizer will shuffle the small table. How to force Spark to shuffle the small table? I tried to write “small inner join big” however it also fails with 1.8TB of shuffle.

Best regards, Alexander

This e-mail is intended solely for the above-mentioned recipient and it may contain confidential or privileged information. If you have received it in error, please notify us immediately and delete the e-mail. You must not copy, distribute, disclose or take any action in reliance on it. In addition, the contents of an attachment to this e-mail may contain software viruses which could damage your own computer system. While ColdLight Solutions, LLC has taken every reasonable precaution to minimize this risk, we cannot accept liability for any damage which you sustain as a result of software viruses. You should perform your own virus checks before opening the attachment.

Re: Force inner join to shuffle the smallest table

Posted by Stephen Carman <sc...@coldlight.com>.
Have you tried shuffle compression?

spark.shuffle.compress (true|false)

if you have a filesystem capable also I’ve noticed file consolidation helps disk usage a bit.

spark.shuffle.consolidateFiles (true|false)

Steve

On Jun 24, 2015, at 3:27 PM, Ulanov, Alexander <al...@hp.com>> wrote:

It also fails, as I mentioned in the original question.

From: CC GP [mailto:chandrika.gopalakrishna@gmail.com]
Sent: Wednesday, June 24, 2015 12:08 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Force inner join to shuffle the smallest table

Try below and see if it makes a difference:

val result = sqlContext.sql(“select big.f1, big.f2 from small inner join big on big.s=small.s and big.d=small.d”)

On Wed, Jun 24, 2015 at 11:35 AM, Ulanov, Alexander <al...@hp.com>> wrote:
Hi,

I try to inner join of two tables on two fields(string and double). One table is 2B rows, the second is 500K. They are stored in HDFS in Parquet. Spark v 1.4.
val big = sqlContext.paquetFile(“hdfs://big”)
data.registerTempTable(“big”)
val small = sqlContext.paquetFile(“hdfs://small”)
data.registerTempTable(“small”)
val result = sqlContext.sql(“select big.f1, big.f2 from big inner join small on big.s=small.s and big.d=small.d”)

This query fails in the middle due to one of the workers “disk out of space” with shuffle reported 1.8TB which is the maximum size of my spark working dirs (on total 7 worker nodes). This is surprising, because the “big” table takes 2TB disk space (unreplicated) and “small” about 5GB and I would expect that optimizer will shuffle the small table. How to force Spark to shuffle the small table? I tried to write “small inner join big” however it also fails with 1.8TB of shuffle.

Best regards, Alexander

This e-mail is intended solely for the above-mentioned recipient and it may contain confidential or privileged information. If you have received it in error, please notify us immediately and delete the e-mail. You must not copy, distribute, disclose or take any action in reliance on it. In addition, the contents of an attachment to this e-mail may contain software viruses which could damage your own computer system. While ColdLight Solutions, LLC has taken every reasonable precaution to minimize this risk, we cannot accept liability for any damage which you sustain as a result of software viruses. You should perform your own virus checks before opening the attachment.

RE: Force inner join to shuffle the smallest table

Posted by "Ulanov, Alexander" <al...@hp.com>.
It also fails, as I mentioned in the original question.

From: CC GP [mailto:chandrika.gopalakrishna@gmail.com]
Sent: Wednesday, June 24, 2015 12:08 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org
Subject: Re: Force inner join to shuffle the smallest table

Try below and see if it makes a difference:

val result = sqlContext.sql(“select big.f1, big.f2 from small inner join big on big.s=small.s and big.d=small.d”)

On Wed, Jun 24, 2015 at 11:35 AM, Ulanov, Alexander <al...@hp.com>> wrote:
Hi,

I try to inner join of two tables on two fields(string and double). One table is 2B rows, the second is 500K. They are stored in HDFS in Parquet. Spark v 1.4.
val big = sqlContext.paquetFile(“hdfs://big”)
data.registerTempTable(“big”)
val small = sqlContext.paquetFile(“hdfs://small”)
data.registerTempTable(“small”)
val result = sqlContext.sql(“select big.f1, big.f2 from big inner join small on big.s=small.s and big.d=small.d”)

This query fails in the middle due to one of the workers “disk out of space” with shuffle reported 1.8TB which is the maximum size of my spark working dirs (on total 7 worker nodes). This is surprising, because the “big” table takes 2TB disk space (unreplicated) and “small” about 5GB and I would expect that optimizer will shuffle the small table. How to force Spark to shuffle the small table? I tried to write “small inner join big” however it also fails with 1.8TB of shuffle.

Best regards, Alexander



Re: Force inner join to shuffle the smallest table

Posted by CC GP <ch...@gmail.com>.
Try below and see if it makes a difference:

val result = sqlContext.sql(“select big.f1, big.f2 from small inner join
big on big.s=small.s and big.d=small.d”)

On Wed, Jun 24, 2015 at 11:35 AM, Ulanov, Alexander <alexander.ulanov@hp.com
> wrote:

>  Hi,
>
>
>
> I try to inner join of two tables on two fields(string and double). One
> table is 2B rows, the second is 500K. They are stored in HDFS in Parquet.
> Spark v 1.4.
>
> val big = sqlContext.paquetFile(“hdfs://big”)
>
> data.registerTempTable(“big”)
>
> val small = sqlContext.paquetFile(“hdfs://small”)
>
> data.registerTempTable(“small”)
>
> val result = sqlContext.sql(“select big.f1, big.f2 from big inner join
> small on big.s=small.s and big.d=small.d”)
>
>
>
> This query fails in the middle due to one of the workers “disk out of
> space” with shuffle reported 1.8TB which is the maximum size of my spark
> working dirs (on total 7 worker nodes). This is surprising, because the
> “big” table takes 2TB disk space (unreplicated) and “small” about 5GB and I
> would expect that optimizer will shuffle the small table. How to force
> Spark to shuffle the small table? I tried to write “small inner join big”
> however it also fails with 1.8TB of shuffle.
>
>
>
> Best regards, Alexander
>
>
>