You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tamara Mendt <tm...@hellofresh.com> on 2016/02/19 18:31:05 UTC

Spark Job Hanging on Join

Hi all,

I am running a Spark job that gets stuck attempting to join two dataframes.
The dataframes are not very large, one is about 2 M rows, and the other a
couple of thousand rows and the resulting joined dataframe should be about
the same size as the smaller dataframe. I have tried triggering execution
of the join using the 'first' operator, which as far as I understand would
not require processing the entire resulting dataframe (maybe I am mistaken
though). The Spark UI is not telling me anything, just showing the task to
be stuck.

When I run the exact same job on a slightly smaller dataset it works
without hanging.

I have used the same environment to run joins on much larger dataframes, so
I am confused as to why in this particular case my Spark job is just
hanging. I have also tried running the same join operation using pyspark on
two 2 Million row dataframes (exactly like the one I am trying to join in
the job that gets stuck) and it runs succesfully.

I have tried caching the joined dataframe to see how much memory it is
requiring but the job gets stuck on this action too. I have also tried
using persist to memory and disk on the join, and the job seems to be stuck
all the same.

Any help as to where to look for the source of the problem would be much
appreciated.

Cheers,

Tamara

Re: Spark Job Hanging on Join

Posted by Dave Moyers <da...@icloud.com>.
Congrats!

Sent from my iPad

> On Feb 23, 2016, at 2:43 AM, Mohannad Ali <ma...@gmail.com> wrote:
> 
> Hello Everyone,
> 
> Thanks a lot for the help. We also managed to solve it but without resorting to spark 1.6.
> 
> The problem we were having was because of a really bad join condition:
> 
> ON ((a.col1 = b.col1) or (a.col1 is null and b.col1 is null)) AND ((a.col2 = b.col2) or (a.col2 is null and b.col2 is null))
> 
> So what we did was re-work our logic to remove the null checks in the join condition and the join went lightning fast afterwards :)
> 
> On Feb 22, 2016 21:24, "Dave Moyers" <da...@icloud.com> wrote:
>> Good article! Thanks for sharing!
>> 
>> 
>> > On Feb 22, 2016, at 11:10 AM, Davies Liu <da...@databricks.com> wrote:
>> >
>> > This link may help:
>> > https://forums.databricks.com/questions/6747/how-do-i-get-a-cartesian-product-of-a-huge-dataset.html
>> >
>> > Spark 1.6 had improved the CatesianProduct, you should turn of auto
>> > broadcast and go with CatesianProduct in 1.6
>> >
>> > On Mon, Feb 22, 2016 at 1:45 AM, Mohannad Ali <ma...@gmail.com> wrote:
>> >> Hello everyone,
>> >>
>> >> I'm working with Tamara and I wanted to give you guys an update on the
>> >> issue:
>> >>
>> >> 1. Here is the output of .explain():
>> >>>
>> >>> Project
>> >>> [sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L,customer_id#25L
>> >>> AS new_customer_id#38L,country#24 AS new_country#39,email#26 AS
>> >>> new_email#40,birthdate#29 AS new_birthdate#41,gender#31 AS
>> >>> new_gender#42,fk_created_at_date#32 AS
>> >>> new_fk_created_at_date#43,age_range#30 AS new_age_range#44,first_name#27 AS
>> >>> new_first_name#45,last_name#28 AS new_last_name#46]
>> >>> BroadcastNestedLoopJoin BuildLeft, LeftOuter, Some((((customer_id#1L =
>> >>> customer_id#25L) || (isnull(customer_id#1L) && isnull(customer_id#25L))) &&
>> >>> ((country#2 = country#24) || (isnull(country#2) && isnull(country#24)))))
>> >>>  Scan
>> >>> PhysicalRDD[country#24,customer_id#25L,email#26,first_name#27,last_name#28,birthdate#29,age_range#30,gender#31,fk_created_at_date#32]
>> >>>  Scan
>> >>> ParquetRelation[hdfs:///databases/dimensions/customer_dimension][sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L]
>> >>
>> >>
>> >> 2. Setting spark.sql.autoBroadcastJoinThreshold=-1 didn't make a difference.
>> >> It still hangs indefinitely.
>> >> 3. We are using Spark 1.5.2
>> >> 4. We tried running this with 4 executors, 9 executors, and even in local
>> >> mode with master set to "local[4]". The issue still persists in all cases.
>> >> 5. Even without trying to cache any of the dataframes this issue still
>> >> happens,.
>> >> 6. We have about 200 partitions.
>> >>
>> >> Any help would be appreciated!
>> >>
>> >> Best Regards,
>> >> Mo
>> >>
>> >> On Sun, Feb 21, 2016 at 8:39 PM, Gourav Sengupta <go...@gmail.com>
>> >> wrote:
>> >>>
>> >>> Sorry,
>> >>>
>> >>> please include the following questions to the list above:
>> >>>
>> >>> the SPARK version?
>> >>> whether you are using RDD or DataFrames?
>> >>> is the code run locally or in SPARK Cluster mode or in AWS EMR?
>> >>>
>> >>>
>> >>> Regards,
>> >>> Gourav Sengupta
>> >>>
>> >>> On Sun, Feb 21, 2016 at 7:37 PM, Gourav Sengupta
>> >>> <go...@gmail.com> wrote:
>> >>>>
>> >>>> Hi Tamara,
>> >>>>
>> >>>> few basic questions first.
>> >>>>
>> >>>> How many executors are you using?
>> >>>> Is the data getting all cached into the same executor?
>> >>>> How many partitions do you have of the data?
>> >>>> How many fields are you trying to use in the join?
>> >>>>
>> >>>> If you need any help in finding answer to these questions please let me
>> >>>> know. From what I reckon joins like yours should not take more than a few
>> >>>> milliseconds.
>> >>>>
>> >>>>
>> >>>> Regards,
>> >>>> Gourav Sengupta
>> >>>>
>> >>>> On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt <tm...@hellofresh.com> wrote:
>> >>>>>
>> >>>>> Hi all,
>> >>>>>
>> >>>>> I am running a Spark job that gets stuck attempting to join two
>> >>>>> dataframes. The dataframes are not very large, one is about 2 M rows, and
>> >>>>> the other a couple of thousand rows and the resulting joined dataframe
>> >>>>> should be about the same size as the smaller dataframe. I have tried
>> >>>>> triggering execution of the join using the 'first' operator, which as far as
>> >>>>> I understand would not require processing the entire resulting dataframe
>> >>>>> (maybe I am mistaken though). The Spark UI is not telling me anything, just
>> >>>>> showing the task to be stuck.
>> >>>>>
>> >>>>> When I run the exact same job on a slightly smaller dataset it works
>> >>>>> without hanging.
>> >>>>>
>> >>>>> I have used the same environment to run joins on much larger dataframes,
>> >>>>> so I am confused as to why in this particular case my Spark job is just
>> >>>>> hanging. I have also tried running the same join operation using pyspark on
>> >>>>> two 2 Million row dataframes (exactly like the one I am trying to join in
>> >>>>> the job that gets stuck) and it runs succesfully.
>> >>>>>
>> >>>>> I have tried caching the joined dataframe to see how much memory it is
>> >>>>> requiring but the job gets stuck on this action too. I have also tried using
>> >>>>> persist to memory and disk on the join, and the job seems to be stuck all
>> >>>>> the same.
>> >>>>>
>> >>>>> Any help as to where to look for the source of the problem would be much
>> >>>>> appreciated.
>> >>>>>
>> >>>>> Cheers,
>> >>>>>
>> >>>>> Tamara
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> > For additional commands, e-mail: user-help@spark.apache.org
>> >

Re: Spark Job Hanging on Join

Posted by Alonso Isidoro Roman <al...@gmail.com>.
thanks for sharing the know how guys

Alonso Isidoro Roman.

Mis citas preferidas (de hoy) :
"Si depurar es el proceso de quitar los errores de software, entonces
programar debe ser el proceso de introducirlos..."
 -  Edsger Dijkstra

My favorite quotes (today):
"If debugging is the process of removing software bugs, then programming
must be the process of putting ..."
  - Edsger Dijkstra

"If you pay peanuts you get monkeys"


2016-02-23 9:43 GMT+01:00 Mohannad Ali <ma...@gmail.com>:

> Hello Everyone,
>
> Thanks a lot for the help. We also managed to solve it but without
> resorting to spark 1.6.
>
> The problem we were having was because of a really bad join condition:
>
> ON ((a.col1 = b.col1) or (a.col1 is null and b.col1 is null)) AND ((a.col2
> = b.col2) or (a.col2 is null and b.col2 is null))
>
> So what we did was re-work our logic to remove the null checks in the join
> condition and the join went lightning fast afterwards :)
> On Feb 22, 2016 21:24, "Dave Moyers" <da...@icloud.com> wrote:
>
>> Good article! Thanks for sharing!
>>
>>
>> > On Feb 22, 2016, at 11:10 AM, Davies Liu <da...@databricks.com> wrote:
>> >
>> > This link may help:
>> >
>> https://forums.databricks.com/questions/6747/how-do-i-get-a-cartesian-product-of-a-huge-dataset.html
>> >
>> > Spark 1.6 had improved the CatesianProduct, you should turn of auto
>> > broadcast and go with CatesianProduct in 1.6
>> >
>> > On Mon, Feb 22, 2016 at 1:45 AM, Mohannad Ali <ma...@gmail.com> wrote:
>> >> Hello everyone,
>> >>
>> >> I'm working with Tamara and I wanted to give you guys an update on the
>> >> issue:
>> >>
>> >> 1. Here is the output of .explain():
>> >>>
>> >>> Project
>> >>>
>> [sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L,customer_id#25L
>> >>> AS new_customer_id#38L,country#24 AS new_country#39,email#26 AS
>> >>> new_email#40,birthdate#29 AS new_birthdate#41,gender#31 AS
>> >>> new_gender#42,fk_created_at_date#32 AS
>> >>> new_fk_created_at_date#43,age_range#30 AS
>> new_age_range#44,first_name#27 AS
>> >>> new_first_name#45,last_name#28 AS new_last_name#46]
>> >>> BroadcastNestedLoopJoin BuildLeft, LeftOuter, Some((((customer_id#1L =
>> >>> customer_id#25L) || (isnull(customer_id#1L) &&
>> isnull(customer_id#25L))) &&
>> >>> ((country#2 = country#24) || (isnull(country#2) &&
>> isnull(country#24)))))
>> >>>  Scan
>> >>>
>> PhysicalRDD[country#24,customer_id#25L,email#26,first_name#27,last_name#28,birthdate#29,age_range#30,gender#31,fk_created_at_date#32]
>> >>>  Scan
>> >>>
>> ParquetRelation[hdfs:///databases/dimensions/customer_dimension][sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L]
>> >>
>> >>
>> >> 2. Setting spark.sql.autoBroadcastJoinThreshold=-1 didn't make a
>> difference.
>> >> It still hangs indefinitely.
>> >> 3. We are using Spark 1.5.2
>> >> 4. We tried running this with 4 executors, 9 executors, and even in
>> local
>> >> mode with master set to "local[4]". The issue still persists in all
>> cases.
>> >> 5. Even without trying to cache any of the dataframes this issue still
>> >> happens,.
>> >> 6. We have about 200 partitions.
>> >>
>> >> Any help would be appreciated!
>> >>
>> >> Best Regards,
>> >> Mo
>> >>
>> >> On Sun, Feb 21, 2016 at 8:39 PM, Gourav Sengupta <
>> gourav.sengupta@gmail.com>
>> >> wrote:
>> >>>
>> >>> Sorry,
>> >>>
>> >>> please include the following questions to the list above:
>> >>>
>> >>> the SPARK version?
>> >>> whether you are using RDD or DataFrames?
>> >>> is the code run locally or in SPARK Cluster mode or in AWS EMR?
>> >>>
>> >>>
>> >>> Regards,
>> >>> Gourav Sengupta
>> >>>
>> >>> On Sun, Feb 21, 2016 at 7:37 PM, Gourav Sengupta
>> >>> <go...@gmail.com> wrote:
>> >>>>
>> >>>> Hi Tamara,
>> >>>>
>> >>>> few basic questions first.
>> >>>>
>> >>>> How many executors are you using?
>> >>>> Is the data getting all cached into the same executor?
>> >>>> How many partitions do you have of the data?
>> >>>> How many fields are you trying to use in the join?
>> >>>>
>> >>>> If you need any help in finding answer to these questions please let
>> me
>> >>>> know. From what I reckon joins like yours should not take more than
>> a few
>> >>>> milliseconds.
>> >>>>
>> >>>>
>> >>>> Regards,
>> >>>> Gourav Sengupta
>> >>>>
>> >>>> On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt <tm...@hellofresh.com>
>> wrote:
>> >>>>>
>> >>>>> Hi all,
>> >>>>>
>> >>>>> I am running a Spark job that gets stuck attempting to join two
>> >>>>> dataframes. The dataframes are not very large, one is about 2 M
>> rows, and
>> >>>>> the other a couple of thousand rows and the resulting joined
>> dataframe
>> >>>>> should be about the same size as the smaller dataframe. I have tried
>> >>>>> triggering execution of the join using the 'first' operator, which
>> as far as
>> >>>>> I understand would not require processing the entire resulting
>> dataframe
>> >>>>> (maybe I am mistaken though). The Spark UI is not telling me
>> anything, just
>> >>>>> showing the task to be stuck.
>> >>>>>
>> >>>>> When I run the exact same job on a slightly smaller dataset it works
>> >>>>> without hanging.
>> >>>>>
>> >>>>> I have used the same environment to run joins on much larger
>> dataframes,
>> >>>>> so I am confused as to why in this particular case my Spark job is
>> just
>> >>>>> hanging. I have also tried running the same join operation using
>> pyspark on
>> >>>>> two 2 Million row dataframes (exactly like the one I am trying to
>> join in
>> >>>>> the job that gets stuck) and it runs succesfully.
>> >>>>>
>> >>>>> I have tried caching the joined dataframe to see how much memory it
>> is
>> >>>>> requiring but the job gets stuck on this action too. I have also
>> tried using
>> >>>>> persist to memory and disk on the join, and the job seems to be
>> stuck all
>> >>>>> the same.
>> >>>>>
>> >>>>> Any help as to where to look for the source of the problem would be
>> much
>> >>>>> appreciated.
>> >>>>>
>> >>>>> Cheers,
>> >>>>>
>> >>>>> Tamara
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> > For additional commands, e-mail: user-help@spark.apache.org
>> >
>>
>>

Re: Spark Job Hanging on Join

Posted by Mohannad Ali <ma...@gmail.com>.
Hello Everyone,

Thanks a lot for the help. We also managed to solve it but without
resorting to spark 1.6.

The problem we were having was because of a really bad join condition:

ON ((a.col1 = b.col1) or (a.col1 is null and b.col1 is null)) AND ((a.col2
= b.col2) or (a.col2 is null and b.col2 is null))

So what we did was re-work our logic to remove the null checks in the join
condition and the join went lightning fast afterwards :)
On Feb 22, 2016 21:24, "Dave Moyers" <da...@icloud.com> wrote:

> Good article! Thanks for sharing!
>
>
> > On Feb 22, 2016, at 11:10 AM, Davies Liu <da...@databricks.com> wrote:
> >
> > This link may help:
> >
> https://forums.databricks.com/questions/6747/how-do-i-get-a-cartesian-product-of-a-huge-dataset.html
> >
> > Spark 1.6 had improved the CatesianProduct, you should turn of auto
> > broadcast and go with CatesianProduct in 1.6
> >
> > On Mon, Feb 22, 2016 at 1:45 AM, Mohannad Ali <ma...@gmail.com> wrote:
> >> Hello everyone,
> >>
> >> I'm working with Tamara and I wanted to give you guys an update on the
> >> issue:
> >>
> >> 1. Here is the output of .explain():
> >>>
> >>> Project
> >>>
> [sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L,customer_id#25L
> >>> AS new_customer_id#38L,country#24 AS new_country#39,email#26 AS
> >>> new_email#40,birthdate#29 AS new_birthdate#41,gender#31 AS
> >>> new_gender#42,fk_created_at_date#32 AS
> >>> new_fk_created_at_date#43,age_range#30 AS
> new_age_range#44,first_name#27 AS
> >>> new_first_name#45,last_name#28 AS new_last_name#46]
> >>> BroadcastNestedLoopJoin BuildLeft, LeftOuter, Some((((customer_id#1L =
> >>> customer_id#25L) || (isnull(customer_id#1L) &&
> isnull(customer_id#25L))) &&
> >>> ((country#2 = country#24) || (isnull(country#2) &&
> isnull(country#24)))))
> >>>  Scan
> >>>
> PhysicalRDD[country#24,customer_id#25L,email#26,first_name#27,last_name#28,birthdate#29,age_range#30,gender#31,fk_created_at_date#32]
> >>>  Scan
> >>>
> ParquetRelation[hdfs:///databases/dimensions/customer_dimension][sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L]
> >>
> >>
> >> 2. Setting spark.sql.autoBroadcastJoinThreshold=-1 didn't make a
> difference.
> >> It still hangs indefinitely.
> >> 3. We are using Spark 1.5.2
> >> 4. We tried running this with 4 executors, 9 executors, and even in
> local
> >> mode with master set to "local[4]". The issue still persists in all
> cases.
> >> 5. Even without trying to cache any of the dataframes this issue still
> >> happens,.
> >> 6. We have about 200 partitions.
> >>
> >> Any help would be appreciated!
> >>
> >> Best Regards,
> >> Mo
> >>
> >> On Sun, Feb 21, 2016 at 8:39 PM, Gourav Sengupta <
> gourav.sengupta@gmail.com>
> >> wrote:
> >>>
> >>> Sorry,
> >>>
> >>> please include the following questions to the list above:
> >>>
> >>> the SPARK version?
> >>> whether you are using RDD or DataFrames?
> >>> is the code run locally or in SPARK Cluster mode or in AWS EMR?
> >>>
> >>>
> >>> Regards,
> >>> Gourav Sengupta
> >>>
> >>> On Sun, Feb 21, 2016 at 7:37 PM, Gourav Sengupta
> >>> <go...@gmail.com> wrote:
> >>>>
> >>>> Hi Tamara,
> >>>>
> >>>> few basic questions first.
> >>>>
> >>>> How many executors are you using?
> >>>> Is the data getting all cached into the same executor?
> >>>> How many partitions do you have of the data?
> >>>> How many fields are you trying to use in the join?
> >>>>
> >>>> If you need any help in finding answer to these questions please let
> me
> >>>> know. From what I reckon joins like yours should not take more than a
> few
> >>>> milliseconds.
> >>>>
> >>>>
> >>>> Regards,
> >>>> Gourav Sengupta
> >>>>
> >>>> On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt <tm...@hellofresh.com>
> wrote:
> >>>>>
> >>>>> Hi all,
> >>>>>
> >>>>> I am running a Spark job that gets stuck attempting to join two
> >>>>> dataframes. The dataframes are not very large, one is about 2 M
> rows, and
> >>>>> the other a couple of thousand rows and the resulting joined
> dataframe
> >>>>> should be about the same size as the smaller dataframe. I have tried
> >>>>> triggering execution of the join using the 'first' operator, which
> as far as
> >>>>> I understand would not require processing the entire resulting
> dataframe
> >>>>> (maybe I am mistaken though). The Spark UI is not telling me
> anything, just
> >>>>> showing the task to be stuck.
> >>>>>
> >>>>> When I run the exact same job on a slightly smaller dataset it works
> >>>>> without hanging.
> >>>>>
> >>>>> I have used the same environment to run joins on much larger
> dataframes,
> >>>>> so I am confused as to why in this particular case my Spark job is
> just
> >>>>> hanging. I have also tried running the same join operation using
> pyspark on
> >>>>> two 2 Million row dataframes (exactly like the one I am trying to
> join in
> >>>>> the job that gets stuck) and it runs succesfully.
> >>>>>
> >>>>> I have tried caching the joined dataframe to see how much memory it
> is
> >>>>> requiring but the job gets stuck on this action too. I have also
> tried using
> >>>>> persist to memory and disk on the join, and the job seems to be
> stuck all
> >>>>> the same.
> >>>>>
> >>>>> Any help as to where to look for the source of the problem would be
> much
> >>>>> appreciated.
> >>>>>
> >>>>> Cheers,
> >>>>>
> >>>>> Tamara
> >>>>>
> >>>>
> >>>
> >>
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> > For additional commands, e-mail: user-help@spark.apache.org
> >
>
>

Re: Spark Job Hanging on Join

Posted by Dave Moyers <da...@icloud.com>.
Good article! Thanks for sharing!


> On Feb 22, 2016, at 11:10 AM, Davies Liu <da...@databricks.com> wrote:
> 
> This link may help:
> https://forums.databricks.com/questions/6747/how-do-i-get-a-cartesian-product-of-a-huge-dataset.html
> 
> Spark 1.6 had improved the CatesianProduct, you should turn of auto
> broadcast and go with CatesianProduct in 1.6
> 
> On Mon, Feb 22, 2016 at 1:45 AM, Mohannad Ali <ma...@gmail.com> wrote:
>> Hello everyone,
>> 
>> I'm working with Tamara and I wanted to give you guys an update on the
>> issue:
>> 
>> 1. Here is the output of .explain():
>>> 
>>> Project
>>> [sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L,customer_id#25L
>>> AS new_customer_id#38L,country#24 AS new_country#39,email#26 AS
>>> new_email#40,birthdate#29 AS new_birthdate#41,gender#31 AS
>>> new_gender#42,fk_created_at_date#32 AS
>>> new_fk_created_at_date#43,age_range#30 AS new_age_range#44,first_name#27 AS
>>> new_first_name#45,last_name#28 AS new_last_name#46]
>>> BroadcastNestedLoopJoin BuildLeft, LeftOuter, Some((((customer_id#1L =
>>> customer_id#25L) || (isnull(customer_id#1L) && isnull(customer_id#25L))) &&
>>> ((country#2 = country#24) || (isnull(country#2) && isnull(country#24)))))
>>>  Scan
>>> PhysicalRDD[country#24,customer_id#25L,email#26,first_name#27,last_name#28,birthdate#29,age_range#30,gender#31,fk_created_at_date#32]
>>>  Scan
>>> ParquetRelation[hdfs:///databases/dimensions/customer_dimension][sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L]
>> 
>> 
>> 2. Setting spark.sql.autoBroadcastJoinThreshold=-1 didn't make a difference.
>> It still hangs indefinitely.
>> 3. We are using Spark 1.5.2
>> 4. We tried running this with 4 executors, 9 executors, and even in local
>> mode with master set to "local[4]". The issue still persists in all cases.
>> 5. Even without trying to cache any of the dataframes this issue still
>> happens,.
>> 6. We have about 200 partitions.
>> 
>> Any help would be appreciated!
>> 
>> Best Regards,
>> Mo
>> 
>> On Sun, Feb 21, 2016 at 8:39 PM, Gourav Sengupta <go...@gmail.com>
>> wrote:
>>> 
>>> Sorry,
>>> 
>>> please include the following questions to the list above:
>>> 
>>> the SPARK version?
>>> whether you are using RDD or DataFrames?
>>> is the code run locally or in SPARK Cluster mode or in AWS EMR?
>>> 
>>> 
>>> Regards,
>>> Gourav Sengupta
>>> 
>>> On Sun, Feb 21, 2016 at 7:37 PM, Gourav Sengupta
>>> <go...@gmail.com> wrote:
>>>> 
>>>> Hi Tamara,
>>>> 
>>>> few basic questions first.
>>>> 
>>>> How many executors are you using?
>>>> Is the data getting all cached into the same executor?
>>>> How many partitions do you have of the data?
>>>> How many fields are you trying to use in the join?
>>>> 
>>>> If you need any help in finding answer to these questions please let me
>>>> know. From what I reckon joins like yours should not take more than a few
>>>> milliseconds.
>>>> 
>>>> 
>>>> Regards,
>>>> Gourav Sengupta
>>>> 
>>>> On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt <tm...@hellofresh.com> wrote:
>>>>> 
>>>>> Hi all,
>>>>> 
>>>>> I am running a Spark job that gets stuck attempting to join two
>>>>> dataframes. The dataframes are not very large, one is about 2 M rows, and
>>>>> the other a couple of thousand rows and the resulting joined dataframe
>>>>> should be about the same size as the smaller dataframe. I have tried
>>>>> triggering execution of the join using the 'first' operator, which as far as
>>>>> I understand would not require processing the entire resulting dataframe
>>>>> (maybe I am mistaken though). The Spark UI is not telling me anything, just
>>>>> showing the task to be stuck.
>>>>> 
>>>>> When I run the exact same job on a slightly smaller dataset it works
>>>>> without hanging.
>>>>> 
>>>>> I have used the same environment to run joins on much larger dataframes,
>>>>> so I am confused as to why in this particular case my Spark job is just
>>>>> hanging. I have also tried running the same join operation using pyspark on
>>>>> two 2 Million row dataframes (exactly like the one I am trying to join in
>>>>> the job that gets stuck) and it runs succesfully.
>>>>> 
>>>>> I have tried caching the joined dataframe to see how much memory it is
>>>>> requiring but the job gets stuck on this action too. I have also tried using
>>>>> persist to memory and disk on the join, and the job seems to be stuck all
>>>>> the same.
>>>>> 
>>>>> Any help as to where to look for the source of the problem would be much
>>>>> appreciated.
>>>>> 
>>>>> Cheers,
>>>>> 
>>>>> Tamara
>>>>> 
>>>> 
>>> 
>> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark Job Hanging on Join

Posted by Davies Liu <da...@databricks.com>.
This link may help:
https://forums.databricks.com/questions/6747/how-do-i-get-a-cartesian-product-of-a-huge-dataset.html

Spark 1.6 had improved the CatesianProduct, you should turn of auto
broadcast and go with CatesianProduct in 1.6

On Mon, Feb 22, 2016 at 1:45 AM, Mohannad Ali <ma...@gmail.com> wrote:
> Hello everyone,
>
> I'm working with Tamara and I wanted to give you guys an update on the
> issue:
>
> 1. Here is the output of .explain():
>>
>> Project
>> [sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L,customer_id#25L
>> AS new_customer_id#38L,country#24 AS new_country#39,email#26 AS
>> new_email#40,birthdate#29 AS new_birthdate#41,gender#31 AS
>> new_gender#42,fk_created_at_date#32 AS
>> new_fk_created_at_date#43,age_range#30 AS new_age_range#44,first_name#27 AS
>> new_first_name#45,last_name#28 AS new_last_name#46]
>>  BroadcastNestedLoopJoin BuildLeft, LeftOuter, Some((((customer_id#1L =
>> customer_id#25L) || (isnull(customer_id#1L) && isnull(customer_id#25L))) &&
>> ((country#2 = country#24) || (isnull(country#2) && isnull(country#24)))))
>>   Scan
>> PhysicalRDD[country#24,customer_id#25L,email#26,first_name#27,last_name#28,birthdate#29,age_range#30,gender#31,fk_created_at_date#32]
>>   Scan
>> ParquetRelation[hdfs:///databases/dimensions/customer_dimension][sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L]
>
>
> 2. Setting spark.sql.autoBroadcastJoinThreshold=-1 didn't make a difference.
> It still hangs indefinitely.
> 3. We are using Spark 1.5.2
> 4. We tried running this with 4 executors, 9 executors, and even in local
> mode with master set to "local[4]". The issue still persists in all cases.
> 5. Even without trying to cache any of the dataframes this issue still
> happens,.
> 6. We have about 200 partitions.
>
> Any help would be appreciated!
>
> Best Regards,
> Mo
>
> On Sun, Feb 21, 2016 at 8:39 PM, Gourav Sengupta <go...@gmail.com>
> wrote:
>>
>> Sorry,
>>
>> please include the following questions to the list above:
>>
>> the SPARK version?
>> whether you are using RDD or DataFrames?
>> is the code run locally or in SPARK Cluster mode or in AWS EMR?
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Sun, Feb 21, 2016 at 7:37 PM, Gourav Sengupta
>> <go...@gmail.com> wrote:
>>>
>>> Hi Tamara,
>>>
>>> few basic questions first.
>>>
>>> How many executors are you using?
>>> Is the data getting all cached into the same executor?
>>> How many partitions do you have of the data?
>>> How many fields are you trying to use in the join?
>>>
>>> If you need any help in finding answer to these questions please let me
>>> know. From what I reckon joins like yours should not take more than a few
>>> milliseconds.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt <tm...@hellofresh.com> wrote:
>>>>
>>>> Hi all,
>>>>
>>>> I am running a Spark job that gets stuck attempting to join two
>>>> dataframes. The dataframes are not very large, one is about 2 M rows, and
>>>> the other a couple of thousand rows and the resulting joined dataframe
>>>> should be about the same size as the smaller dataframe. I have tried
>>>> triggering execution of the join using the 'first' operator, which as far as
>>>> I understand would not require processing the entire resulting dataframe
>>>> (maybe I am mistaken though). The Spark UI is not telling me anything, just
>>>> showing the task to be stuck.
>>>>
>>>> When I run the exact same job on a slightly smaller dataset it works
>>>> without hanging.
>>>>
>>>> I have used the same environment to run joins on much larger dataframes,
>>>> so I am confused as to why in this particular case my Spark job is just
>>>> hanging. I have also tried running the same join operation using pyspark on
>>>> two 2 Million row dataframes (exactly like the one I am trying to join in
>>>> the job that gets stuck) and it runs succesfully.
>>>>
>>>> I have tried caching the joined dataframe to see how much memory it is
>>>> requiring but the job gets stuck on this action too. I have also tried using
>>>> persist to memory and disk on the join, and the job seems to be stuck all
>>>> the same.
>>>>
>>>> Any help as to where to look for the source of the problem would be much
>>>> appreciated.
>>>>
>>>> Cheers,
>>>>
>>>> Tamara
>>>>
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark Job Hanging on Join

Posted by Mohannad Ali <ma...@gmail.com>.
Hello everyone,

I'm working with Tamara and I wanted to give you guys an update on the
issue:

1. Here is the output of .explain():

> Project
> [sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L,customer_id#25L
> AS new_customer_id#38L,country#24 AS new_country#39,email#26 AS
> new_email#40,birthdate#29 AS new_birthdate#41,gender#31 AS
> new_gender#42,fk_created_at_date#32 AS
> new_fk_created_at_date#43,age_range#30 AS new_age_range#44,first_name#27 AS
> new_first_name#45,last_name#28 AS new_last_name#46]
>  BroadcastNestedLoopJoin BuildLeft, LeftOuter, Some((((customer_id#1L =
> customer_id#25L) || (isnull(customer_id#1L) && isnull(customer_id#25L))) &&
> ((country#2 = country#24) || (isnull(country#2) && isnull(country#24)))))
>   Scan
> PhysicalRDD[country#24,customer_id#25L,email#26,first_name#27,last_name#28,birthdate#29,age_range#30,gender#31,fk_created_at_date#32]
>   Scan
> ParquetRelation[hdfs:///databases/dimensions/customer_dimension][sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L]


2. Setting spark.sql.autoBroadcastJoinThreshold=-1 didn't make a
difference. It still hangs indefinitely.
3. We are using Spark 1.5.2
4. We tried running this with 4 executors, 9 executors, and even in local
mode with master set to "local[4]". The issue still persists in all cases.
5. Even without trying to cache any of the dataframes this issue still
happens,.
6. We have about 200 partitions.

Any help would be appreciated!

Best Regards,
Mo

On Sun, Feb 21, 2016 at 8:39 PM, Gourav Sengupta <go...@gmail.com>
wrote:

> Sorry,
>
> please include the following questions to the list above:
>
> the SPARK version?
> whether you are using RDD or DataFrames?
> is the code run locally or in SPARK Cluster mode or in AWS EMR?
>
>
> Regards,
> Gourav Sengupta
>
> On Sun, Feb 21, 2016 at 7:37 PM, Gourav Sengupta <
> gourav.sengupta@gmail.com> wrote:
>
>> Hi Tamara,
>>
>> few basic questions first.
>>
>> How many executors are you using?
>> Is the data getting all cached into the same executor?
>> How many partitions do you have of the data?
>> How many fields are you trying to use in the join?
>>
>> If you need any help in finding answer to these questions please let me
>> know. From what I reckon joins like yours should not take more than a few
>> milliseconds.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt <tm...@hellofresh.com> wrote:
>>
>>> Hi all,
>>>
>>> I am running a Spark job that gets stuck attempting to join two
>>> dataframes. The dataframes are not very large, one is about 2 M rows, and
>>> the other a couple of thousand rows and the resulting joined dataframe
>>> should be about the same size as the smaller dataframe. I have tried
>>> triggering execution of the join using the 'first' operator, which as far
>>> as I understand would not require processing the entire resulting dataframe
>>> (maybe I am mistaken though). The Spark UI is not telling me anything, just
>>> showing the task to be stuck.
>>>
>>> When I run the exact same job on a slightly smaller dataset it works
>>> without hanging.
>>>
>>> I have used the same environment to run joins on much larger dataframes,
>>> so I am confused as to why in this particular case my Spark job is just
>>> hanging. I have also tried running the same join operation using pyspark on
>>> two 2 Million row dataframes (exactly like the one I am trying to join in
>>> the job that gets stuck) and it runs succesfully.
>>>
>>> I have tried caching the joined dataframe to see how much memory it is
>>> requiring but the job gets stuck on this action too. I have also tried
>>> using persist to memory and disk on the join, and the job seems to be stuck
>>> all the same.
>>>
>>> Any help as to where to look for the source of the problem would be much
>>> appreciated.
>>>
>>> Cheers,
>>>
>>> Tamara
>>>
>>>
>>
>

Re: Spark Job Hanging on Join

Posted by Gourav Sengupta <go...@gmail.com>.
Sorry,

please include the following questions to the list above:

the SPARK version?
whether you are using RDD or DataFrames?
is the code run locally or in SPARK Cluster mode or in AWS EMR?


Regards,
Gourav Sengupta

On Sun, Feb 21, 2016 at 7:37 PM, Gourav Sengupta <go...@gmail.com>
wrote:

> Hi Tamara,
>
> few basic questions first.
>
> How many executors are you using?
> Is the data getting all cached into the same executor?
> How many partitions do you have of the data?
> How many fields are you trying to use in the join?
>
> If you need any help in finding answer to these questions please let me
> know. From what I reckon joins like yours should not take more than a few
> milliseconds.
>
>
> Regards,
> Gourav Sengupta
>
> On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt <tm...@hellofresh.com> wrote:
>
>> Hi all,
>>
>> I am running a Spark job that gets stuck attempting to join two
>> dataframes. The dataframes are not very large, one is about 2 M rows, and
>> the other a couple of thousand rows and the resulting joined dataframe
>> should be about the same size as the smaller dataframe. I have tried
>> triggering execution of the join using the 'first' operator, which as far
>> as I understand would not require processing the entire resulting dataframe
>> (maybe I am mistaken though). The Spark UI is not telling me anything, just
>> showing the task to be stuck.
>>
>> When I run the exact same job on a slightly smaller dataset it works
>> without hanging.
>>
>> I have used the same environment to run joins on much larger dataframes,
>> so I am confused as to why in this particular case my Spark job is just
>> hanging. I have also tried running the same join operation using pyspark on
>> two 2 Million row dataframes (exactly like the one I am trying to join in
>> the job that gets stuck) and it runs succesfully.
>>
>> I have tried caching the joined dataframe to see how much memory it is
>> requiring but the job gets stuck on this action too. I have also tried
>> using persist to memory and disk on the join, and the job seems to be stuck
>> all the same.
>>
>> Any help as to where to look for the source of the problem would be much
>> appreciated.
>>
>> Cheers,
>>
>> Tamara
>>
>>
>

Re: Spark Job Hanging on Join

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Tamara,

few basic questions first.

How many executors are you using?
Is the data getting all cached into the same executor?
How many partitions do you have of the data?
How many fields are you trying to use in the join?

If you need any help in finding answer to these questions please let me
know. From what I reckon joins like yours should not take more than a few
milliseconds.


Regards,
Gourav Sengupta

On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt <tm...@hellofresh.com> wrote:

> Hi all,
>
> I am running a Spark job that gets stuck attempting to join two
> dataframes. The dataframes are not very large, one is about 2 M rows, and
> the other a couple of thousand rows and the resulting joined dataframe
> should be about the same size as the smaller dataframe. I have tried
> triggering execution of the join using the 'first' operator, which as far
> as I understand would not require processing the entire resulting dataframe
> (maybe I am mistaken though). The Spark UI is not telling me anything, just
> showing the task to be stuck.
>
> When I run the exact same job on a slightly smaller dataset it works
> without hanging.
>
> I have used the same environment to run joins on much larger dataframes,
> so I am confused as to why in this particular case my Spark job is just
> hanging. I have also tried running the same join operation using pyspark on
> two 2 Million row dataframes (exactly like the one I am trying to join in
> the job that gets stuck) and it runs succesfully.
>
> I have tried caching the joined dataframe to see how much memory it is
> requiring but the job gets stuck on this action too. I have also tried
> using persist to memory and disk on the join, and the job seems to be stuck
> all the same.
>
> Any help as to where to look for the source of the problem would be much
> appreciated.
>
> Cheers,
>
> Tamara
>
>

Re: Spark Job Hanging on Join

Posted by Dave Moyers <da...@icloud.com>.
Try this setting in your Spark defaults:

spark.sql.autoBroadcastJoinThreshold=-1

I had a similar problem with joins hanging and that resolved it for me. 

You might be able to pass that value from the driver as a --conf option, but I have not tried that, and not sure if that will work. 

Sent from my iPad

> On Feb 19, 2016, at 11:31 AM, Tamara Mendt <tm...@hellofresh.com> wrote:
> 
> Hi all, 
> 
> I am running a Spark job that gets stuck attempting to join two dataframes. The dataframes are not very large, one is about 2 M rows, and the other a couple of thousand rows and the resulting joined dataframe should be about the same size as the smaller dataframe. I have tried triggering execution of the join using the 'first' operator, which as far as I understand would not require processing the entire resulting dataframe (maybe I am mistaken though). The Spark UI is not telling me anything, just showing the task to be stuck.
> 
> When I run the exact same job on a slightly smaller dataset it works without hanging.
> 
> I have used the same environment to run joins on much larger dataframes, so I am confused as to why in this particular case my Spark job is just hanging. I have also tried running the same join operation using pyspark on two 2 Million row dataframes (exactly like the one I am trying to join in the job that gets stuck) and it runs succesfully.
> 
> I have tried caching the joined dataframe to see how much memory it is requiring but the job gets stuck on this action too. I have also tried using persist to memory and disk on the join, and the job seems to be stuck all the same. 
> 
> Any help as to where to look for the source of the problem would be much appreciated.
> 
> Cheers,
> 
> Tamara
> 

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark Job Hanging on Join

Posted by Michael Armbrust <mi...@databricks.com>.
Please include the output of running explain() when reporting performance
issues with DataFrames.

On Fri, Feb 19, 2016 at 9:31 AM, Tamara Mendt <tm...@hellofresh.com> wrote:

> Hi all,
>
> I am running a Spark job that gets stuck attempting to join two
> dataframes. The dataframes are not very large, one is about 2 M rows, and
> the other a couple of thousand rows and the resulting joined dataframe
> should be about the same size as the smaller dataframe. I have tried
> triggering execution of the join using the 'first' operator, which as far
> as I understand would not require processing the entire resulting dataframe
> (maybe I am mistaken though). The Spark UI is not telling me anything, just
> showing the task to be stuck.
>
> When I run the exact same job on a slightly smaller dataset it works
> without hanging.
>
> I have used the same environment to run joins on much larger dataframes,
> so I am confused as to why in this particular case my Spark job is just
> hanging. I have also tried running the same join operation using pyspark on
> two 2 Million row dataframes (exactly like the one I am trying to join in
> the job that gets stuck) and it runs succesfully.
>
> I have tried caching the joined dataframe to see how much memory it is
> requiring but the job gets stuck on this action too. I have also tried
> using persist to memory and disk on the join, and the job seems to be stuck
> all the same.
>
> Any help as to where to look for the source of the problem would be much
> appreciated.
>
> Cheers,
>
> Tamara
>
>