You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Chin Wei Low <lo...@gmail.com> on 2016/10/07 04:03:46 UTC

Spark SQL is slower when DataFrame is cache in Memory

Hi,

I am using Spark 1.6.0. I have a Spark application that create and cache
(in memory) DataFrames (around 50+, with some on single parquet file and
some on folder with a few parquet files) with the following codes:

val df = sqlContext.read.parquet
df.persist
df.count

I union them to 3 DataFrames and register that as temp table.

Then, run the following codes:
val res = sqlContext.sql("table1 union table2 union table3")
res.collect()

The res.collect() is slower when I cache the DataFrame compare to without
cache. e.g. 3 seconds vs 1 second

I turn on the DEBUG log and see there is a gap from the res.collect() to
start the Spark job.

Is the extra time taken by the query planning & optimization? It does not
show the gap when I do not cache the dataframe.

Anything I am missing here?

Regards,
Chin Wei

Re: Spark SQL is slower when DataFrame is cache in Memory

Posted by Kazuaki Ishizaki <IS...@jp.ibm.com>.
Hi Chin Wei,
Thank you for confirming this on 2.0.1 and being happy to hear it never 
happens.

The performance will be improved when this PR (
https://github.com/apache/spark/pull/15219) is integrated.

Regards,
Kazuaki Ishizaki



From:   Chin Wei Low <lo...@gmail.com>
To:     Kazuaki Ishizaki/Japan/IBM@IBMJP
Cc:     user <us...@spark.apache.org>
Date:   2016/10/25 17:33
Subject:        Re: Spark SQL is slower when DataFrame is cache in Memory



Hi Kazuaki,

I print a debug log right before I call the collect, and use that to 
compare against the job start log (it is available when turning on debug 
log).
Anyway, I test that in Spark 2.0.1 and never see it happen. But, the query 
on cached dataframe is still slightly slower than the one without cached 
when it is running on Spark 2.0.1.

Regards,
Low Chin Wei

On Tue, Oct 25, 2016 at 3:39 AM, Kazuaki Ishizaki <IS...@jp.ibm.com> 
wrote:
Hi Chin Wei,
I am sorry for being late to reply.

Got it. Interesting behavior. How did you measure the time between 1st and 
2nd events?

Best Regards,
Kazuaki Ishizaki



From:        Chin Wei Low <lo...@gmail.com>
To:        Kazuaki Ishizaki/Japan/IBM@IBMJP
Cc:        user@spark.apache.org
Date:        2016/10/10 11:33

Subject:        Re: Spark SQL is slower when DataFrame is cache in Memory



Hi Ishizaki san,

Thanks for the reply.

So, when I pre-cache the dataframe, the cache is being used during the job 
execution.

Actually there are 3 events:
1. call res.collect
2. job started
3. job completed

I am concerning about the longer time taken between 1st and 2nd events. It 
seems like the query planning and optimization is longer when query on 
cached dataframe.


Regards,
Chin Wei

On Fri, Oct 7, 2016 at 10:14 PM, Kazuaki Ishizaki <IS...@jp.ibm.com> 
wrote:
Hi Chin Wei,
Yes, since you force to create a cache by executing df.count, Spark starts 
to get data from cache for the following task:
val res = sqlContext.sql("table1 union table2 union table3")
res.collect()

If you insert 'res.explain', you can confirm which resource you use to get 
data, cache or parquet?
val res = sqlContext.sql("table1 union table2 union table3")
res.explain(true)
res.collect()

Do I make some misunderstandings?

Best Regards,
Kazuaki Ishizaki



From:        Chin Wei Low <lo...@gmail.com>
To:        Kazuaki Ishizaki/Japan/IBM@IBMJP
Cc:        user@spark.apache.org
Date:        2016/10/07 20:06
Subject:        Re: Spark SQL is slower when DataFrame is cache in Memory




Hi Ishizaki san,

So there is a gap between res.collect
and when I see this log:   spark.SparkContext: Starting job: collect at 
<console>:26

What you mean is, during this time Spark already start to get data from 
cache? Isn't it should only get the data after the job is started and 
tasks are distributed?

Regards,
Chin Wei


On Fri, Oct 7, 2016 at 3:43 PM, Kazuaki Ishizaki <IS...@jp.ibm.com> 
wrote:
Hi,
I think that the result looks correct. The current Spark spends extra time 
for getting data from a cache. There are two reasons. One is for a 
complicated path to get a data. The other is for decompression in the case 
of a primitive type.
The new implementation (https://github.com/apache/spark/pull/15219) is 
ready for review. It would achieve 1.2x performance improvement for a 
compressed column and much performance improvement for an uncompressed 
column.

Best Regards,
Kazuaki Ishizaki



From:        Chin Wei Low <lo...@gmail.com>
To:        user@spark.apache.org
Date:        2016/10/07 13:05
Subject:        Spark SQL is slower when DataFrame is cache in Memory




Hi,

I am using Spark 1.6.0. I have a Spark application that create and cache 
(in memory) DataFrames (around 50+, with some on single parquet file and 
some on folder with a few parquet files) with the following codes:

val df = sqlContext.read.parquet
df.persist
df.count

I union them to 3 DataFrames and register that as temp table.

Then, run the following codes:
val res = sqlContext.sql("table1 union table2 union table3")
res.collect()

The res.collect() is slower when I cache the DataFrame compare to without 
cache. e.g. 3 seconds vs 1 second

I turn on the DEBUG log and see there is a gap from the res.collect() to 
start the Spark job.

Is the extra time taken by the query planning & optimization? It does not 
show the gap when I do not cache the dataframe.

Anything I am missing here?

Regards,
Chin Wei









Re: Spark SQL is slower when DataFrame is cache in Memory

Posted by Chin Wei Low <lo...@gmail.com>.
Hi Kazuaki,

I print a debug log right before I call the collect, and use that to
compare against the job start log (it is available when turning on debug
log).
Anyway, I test that in Spark 2.0.1 and never see it happen. But, the query
on cached dataframe is still slightly slower than the one without cached
when it is running on Spark 2.0.1.

Regards,
Low Chin Wei

On Tue, Oct 25, 2016 at 3:39 AM, Kazuaki Ishizaki <IS...@jp.ibm.com>
wrote:

> Hi Chin Wei,
> I am sorry for being late to reply.
>
> Got it. Interesting behavior. How did you measure the time between 1st and
> 2nd events?
>
> Best Regards,
> Kazuaki Ishizaki
>
>
>
> From:        Chin Wei Low <lo...@gmail.com>
> To:        Kazuaki Ishizaki/Japan/IBM@IBMJP
> Cc:        user@spark.apache.org
> Date:        2016/10/10 11:33
>
> Subject:        Re: Spark SQL is slower when DataFrame is cache in Memory
> ------------------------------
>
>
>
> Hi Ishizaki san,
>
> Thanks for the reply.
>
> So, when I pre-cache the dataframe, the cache is being used during the job
> execution.
>
> Actually there are 3 events:
> 1. call res.collect
> 2. job started
> 3. job completed
>
> I am concerning about the longer time taken between 1st and 2nd events. It
> seems like the query planning and optimization is longer when query on
> cached dataframe.
>
>
> Regards,
> Chin Wei
>
> On Fri, Oct 7, 2016 at 10:14 PM, Kazuaki Ishizaki <*ISHIZAKI@jp.ibm.com*
> <IS...@jp.ibm.com>> wrote:
> Hi Chin Wei,
> Yes, since you force to create a cache by executing df.count, Spark starts
> to get data from cache for the following task:
> val res = sqlContext.sql("table1 union table2 union table3")
> res.collect()
>
> If you insert 'res.explain', you can confirm which resource you use to get
> data, cache or parquet?
> val res = sqlContext.sql("table1 union table2 union table3")
> res.explain(true)
> res.collect()
>
> Do I make some misunderstandings?
>
> Best Regards,
> Kazuaki Ishizaki
>
>
>
> From:        Chin Wei Low <*lowchinwei@gmail.com* <lo...@gmail.com>>
> To:        Kazuaki Ishizaki/Japan/IBM@IBMJP
> Cc:        *user@spark.apache.org* <us...@spark.apache.org>
> Date:        2016/10/07 20:06
> Subject:        Re: Spark SQL is slower when DataFrame is cache in Memory
>
> ------------------------------
>
>
>
> Hi Ishizaki san,
>
> So there is a gap between res.collect
> and when I see this log:   spark.SparkContext: Starting job: collect at
> <console>:26
>
> What you mean is, during this time Spark already start to get data from
> cache? Isn't it should only get the data after the job is started and tasks
> are distributed?
>
> Regards,
> Chin Wei
>
>
> On Fri, Oct 7, 2016 at 3:43 PM, Kazuaki Ishizaki <*ISHIZAKI@jp.ibm.com*
> <IS...@jp.ibm.com>> wrote:
> Hi,
> I think that the result looks correct. The current Spark spends extra time
> for getting data from a cache. There are two reasons. One is for a
> complicated path to get a data. The other is for decompression in the case
> of a primitive type.
> The new implementation (*https://github.com/apache/spark/pull/15219*
> <https://github.com/apache/spark/pull/15219>) is ready for review. It
> would achieve 1.2x performance improvement for a compressed column and much
> performance improvement for an uncompressed column.
>
> Best Regards,
> Kazuaki Ishizaki
>
>
>
> From:        Chin Wei Low <*lowchinwei@gmail.com* <lo...@gmail.com>>
> To:        *user@spark.apache.org* <us...@spark.apache.org>
> Date:        2016/10/07 13:05
> Subject:        Spark SQL is slower when DataFrame is cache in Memory
> ------------------------------
>
>
>
>
> Hi,
>
> I am using Spark 1.6.0. I have a Spark application that create and cache
> (in memory) DataFrames (around 50+, with some on single parquet file and
> some on folder with a few parquet files) with the following codes:
>
> val df = sqlContext.read.parquet
> df.persist
> df.count
>
> I union them to 3 DataFrames and register that as temp table.
>
> Then, run the following codes:
> val res = sqlContext.sql("table1 union table2 union table3")
> res.collect()
>
> The res.collect() is slower when I cache the DataFrame compare to without
> cache. e.g. 3 seconds vs 1 second
>
> I turn on the DEBUG log and see there is a gap from the res.collect() to
> start the Spark job.
>
> Is the extra time taken by the query planning & optimization? It does not
> show the gap when I do not cache the dataframe.
>
> Anything I am missing here?
>
> Regards,
> Chin Wei
>
>
>
>
>
>

Re: Spark SQL is slower when DataFrame is cache in Memory

Posted by Kazuaki Ishizaki <IS...@jp.ibm.com>.
Hi Chin Wei,
I am sorry for being late to reply.

Got it. Interesting behavior. How did you measure the time between 1st and 
2nd events?

Best Regards,
Kazuaki Ishizaki



From:   Chin Wei Low <lo...@gmail.com>
To:     Kazuaki Ishizaki/Japan/IBM@IBMJP
Cc:     user@spark.apache.org
Date:   2016/10/10 11:33
Subject:        Re: Spark SQL is slower when DataFrame is cache in Memory



Hi Ishizaki san,

Thanks for the reply.

So, when I pre-cache the dataframe, the cache is being used during the job 
execution.

Actually there are 3 events:
1. call res.collect
2. job started
3. job completed

I am concerning about the longer time taken between 1st and 2nd events. It 
seems like the query planning and optimization is longer when query on 
cached dataframe.


Regards,
Chin Wei

On Fri, Oct 7, 2016 at 10:14 PM, Kazuaki Ishizaki <IS...@jp.ibm.com> 
wrote:
Hi Chin Wei,
Yes, since you force to create a cache by executing df.count, Spark starts 
to get data from cache for the following task:
val res = sqlContext.sql("table1 union table2 union table3")
res.collect()

If you insert 'res.explain', you can confirm which resource you use to get 
data, cache or parquet?
val res = sqlContext.sql("table1 union table2 union table3")
res.explain(true)
res.collect()

Do I make some misunderstandings?

Best Regards,
Kazuaki Ishizaki



From:        Chin Wei Low <lo...@gmail.com>
To:        Kazuaki Ishizaki/Japan/IBM@IBMJP
Cc:        user@spark.apache.org
Date:        2016/10/07 20:06
Subject:        Re: Spark SQL is slower when DataFrame is cache in Memory




Hi Ishizaki san,

So there is a gap between res.collect
and when I see this log:   spark.SparkContext: Starting job: collect at 
<console>:26

What you mean is, during this time Spark already start to get data from 
cache? Isn't it should only get the data after the job is started and 
tasks are distributed?

Regards,
Chin Wei


On Fri, Oct 7, 2016 at 3:43 PM, Kazuaki Ishizaki <IS...@jp.ibm.com> 
wrote:
Hi,
I think that the result looks correct. The current Spark spends extra time 
for getting data from a cache. There are two reasons. One is for a 
complicated path to get a data. The other is for decompression in the case 
of a primitive type.
The new implementation (https://github.com/apache/spark/pull/15219) is 
ready for review. It would achieve 1.2x performance improvement for a 
compressed column and much performance improvement for an uncompressed 
column.

Best Regards,
Kazuaki Ishizaki



From:        Chin Wei Low <lo...@gmail.com>
To:        user@spark.apache.org
Date:        2016/10/07 13:05
Subject:        Spark SQL is slower when DataFrame is cache in Memory




Hi,

I am using Spark 1.6.0. I have a Spark application that create and cache 
(in memory) DataFrames (around 50+, with some on single parquet file and 
some on folder with a few parquet files) with the following codes:

val df = sqlContext.read.parquet
df.persist
df.count

I union them to 3 DataFrames and register that as temp table.

Then, run the following codes:
val res = sqlContext.sql("table1 union table2 union table3")
res.collect()

The res.collect() is slower when I cache the DataFrame compare to without 
cache. e.g. 3 seconds vs 1 second

I turn on the DEBUG log and see there is a gap from the res.collect() to 
start the Spark job.

Is the extra time taken by the query planning & optimization? It does not 
show the gap when I do not cache the dataframe.

Anything I am missing here?

Regards,
Chin Wei







Re: Spark SQL is slower when DataFrame is cache in Memory

Posted by Chin Wei Low <lo...@gmail.com>.
Hi Ishizaki san,

Thanks for the reply.

So, when I pre-cache the dataframe, the cache is being used during the job
execution.

Actually there are 3 events:
1. call res.collect
2. job started
3. job completed

I am concerning about the longer time taken between 1st and 2nd events. It
seems like the query planning and optimization is longer when query on
cached dataframe.


Regards,
Chin Wei

On Fri, Oct 7, 2016 at 10:14 PM, Kazuaki Ishizaki <IS...@jp.ibm.com>
wrote:

> Hi Chin Wei,
> Yes, since you force to create a cache by executing df.count, Spark starts
> to get data from cache for the following task:
> val res = sqlContext.sql("table1 union table2 union table3")
> res.collect()
>
> If you insert 'res.explain', you can confirm which resource you use to get
> data, cache or parquet?
> val res = sqlContext.sql("table1 union table2 union table3")
> res.explain(true)
> res.collect()
>
> Do I make some misunderstandings?
>
> Best Regards,
> Kazuaki Ishizaki
>
>
>
> From:        Chin Wei Low <lo...@gmail.com>
> To:        Kazuaki Ishizaki/Japan/IBM@IBMJP
> Cc:        user@spark.apache.org
> Date:        2016/10/07 20:06
> Subject:        Re: Spark SQL is slower when DataFrame is cache in Memory
>
> ------------------------------
>
>
>
> Hi Ishizaki san,
>
> So there is a gap between res.collect
> and when I see this log:   spark.SparkContext: Starting job: collect at
> <console>:26
>
> What you mean is, during this time Spark already start to get data from
> cache? Isn't it should only get the data after the job is started and tasks
> are distributed?
>
> Regards,
> Chin Wei
>
>
> On Fri, Oct 7, 2016 at 3:43 PM, Kazuaki Ishizaki <*ISHIZAKI@jp.ibm.com*
> <IS...@jp.ibm.com>> wrote:
> Hi,
> I think that the result looks correct. The current Spark spends extra time
> for getting data from a cache. There are two reasons. One is for a
> complicated path to get a data. The other is for decompression in the case
> of a primitive type.
> The new implementation (*https://github.com/apache/spark/pull/15219*
> <https://github.com/apache/spark/pull/15219>) is ready for review. It
> would achieve 1.2x performance improvement for a compressed column and much
> performance improvement for an uncompressed column.
>
> Best Regards,
> Kazuaki Ishizaki
>
>
>
> From:        Chin Wei Low <*lowchinwei@gmail.com* <lo...@gmail.com>>
> To:        *user@spark.apache.org* <us...@spark.apache.org>
> Date:        2016/10/07 13:05
> Subject:        Spark SQL is slower when DataFrame is cache in Memory
> ------------------------------
>
>
>
>
> Hi,
>
> I am using Spark 1.6.0. I have a Spark application that create and cache
> (in memory) DataFrames (around 50+, with some on single parquet file and
> some on folder with a few parquet files) with the following codes:
>
> val df = sqlContext.read.parquet
> df.persist
> df.count
>
> I union them to 3 DataFrames and register that as temp table.
>
> Then, run the following codes:
> val res = sqlContext.sql("table1 union table2 union table3")
> res.collect()
>
> The res.collect() is slower when I cache the DataFrame compare to without
> cache. e.g. 3 seconds vs 1 second
>
> I turn on the DEBUG log and see there is a gap from the res.collect() to
> start the Spark job.
>
> Is the extra time taken by the query planning & optimization? It does not
> show the gap when I do not cache the dataframe.
>
> Anything I am missing here?
>
> Regards,
> Chin Wei
>
>
>
>

Re: Spark SQL is slower when DataFrame is cache in Memory

Posted by Kazuaki Ishizaki <IS...@jp.ibm.com>.
Hi Chin Wei,
Yes, since you force to create a cache by executing df.count, Spark starts 
to get data from cache for the following task:
val res = sqlContext.sql("table1 union table2 union table3")
res.collect()

If you insert 'res.explain', you can confirm which resource you use to get 
data, cache or parquet?
val res = sqlContext.sql("table1 union table2 union table3")
res.explain(true)
res.collect()

Do I make some misunderstandings?

Best Regards,
Kazuaki Ishizaki



From:   Chin Wei Low <lo...@gmail.com>
To:     Kazuaki Ishizaki/Japan/IBM@IBMJP
Cc:     user@spark.apache.org
Date:   2016/10/07 20:06
Subject:        Re: Spark SQL is slower when DataFrame is cache in Memory



Hi Ishizaki san,

So there is a gap between res.collect
and when I see this log:   spark.SparkContext: Starting job: collect at 
<console>:26

What you mean is, during this time Spark already start to get data from 
cache? Isn't it should only get the data after the job is started and 
tasks are distributed?

Regards,
Chin Wei


On Fri, Oct 7, 2016 at 3:43 PM, Kazuaki Ishizaki <IS...@jp.ibm.com> 
wrote:
Hi,
I think that the result looks correct. The current Spark spends extra time 
for getting data from a cache. There are two reasons. One is for a 
complicated path to get a data. The other is for decompression in the case 
of a primitive type.
The new implementation (https://github.com/apache/spark/pull/15219) is 
ready for review. It would achieve 1.2x performance improvement for a 
compressed column and much performance improvement for an uncompressed 
column.

Best Regards,
Kazuaki Ishizaki



From:        Chin Wei Low <lo...@gmail.com>
To:        user@spark.apache.org
Date:        2016/10/07 13:05
Subject:        Spark SQL is slower when DataFrame is cache in Memory




Hi,

I am using Spark 1.6.0. I have a Spark application that create and cache 
(in memory) DataFrames (around 50+, with some on single parquet file and 
some on folder with a few parquet files) with the following codes:

val df = sqlContext.read.parquet
df.persist
df.count

I union them to 3 DataFrames and register that as temp table.

Then, run the following codes:
val res = sqlContext.sql("table1 union table2 union table3")
res.collect()

The res.collect() is slower when I cache the DataFrame compare to without 
cache. e.g. 3 seconds vs 1 second

I turn on the DEBUG log and see there is a gap from the res.collect() to 
start the Spark job.

Is the extra time taken by the query planning & optimization? It does not 
show the gap when I do not cache the dataframe.

Anything I am missing here?

Regards,
Chin Wei





Re: Spark SQL is slower when DataFrame is cache in Memory

Posted by Chin Wei Low <lo...@gmail.com>.
Hi Ishizaki san,

So there is a gap between res.collect
and when I see this log:   spark.SparkContext: Starting job: collect at
<console>:26

What you mean is, during this time Spark already start to get data from
cache? Isn't it should only get the data after the job is started and tasks
are distributed?

Regards,
Chin Wei


On Fri, Oct 7, 2016 at 3:43 PM, Kazuaki Ishizaki <IS...@jp.ibm.com>
wrote:

> Hi,
> I think that the result looks correct. The current Spark spends extra time
> for getting data from a cache. There are two reasons. One is for a
> complicated path to get a data. The other is for decompression in the case
> of a primitive type.
> The new implementation (https://github.com/apache/spark/pull/15219) is
> ready for review. It would achieve 1.2x performance improvement for a
> compressed column and much performance improvement for an uncompressed
> column.
>
> Best Regards,
> Kazuaki Ishizaki
>
>
>
> From:        Chin Wei Low <lo...@gmail.com>
> To:        user@spark.apache.org
> Date:        2016/10/07 13:05
> Subject:        Spark SQL is slower when DataFrame is cache in Memory
> ------------------------------
>
>
>
> Hi,
>
> I am using Spark 1.6.0. I have a Spark application that create and cache
> (in memory) DataFrames (around 50+, with some on single parquet file and
> some on folder with a few parquet files) with the following codes:
>
> val df = sqlContext.read.parquet
> df.persist
> df.count
>
> I union them to 3 DataFrames and register that as temp table.
>
> Then, run the following codes:
> val res = sqlContext.sql("table1 union table2 union table3")
> res.collect()
>
> The res.collect() is slower when I cache the DataFrame compare to without
> cache. e.g. 3 seconds vs 1 second
>
> I turn on the DEBUG log and see there is a gap from the res.collect() to
> start the Spark job.
>
> Is the extra time taken by the query planning & optimization? It does not
> show the gap when I do not cache the dataframe.
>
> Anything I am missing here?
>
> Regards,
> Chin Wei
>
>

Re: Spark SQL is slower when DataFrame is cache in Memory

Posted by Kazuaki Ishizaki <IS...@jp.ibm.com>.
Hi,
I think that the result looks correct. The current Spark spends extra time 
for getting data from a cache. There are two reasons. One is for a 
complicated path to get a data. The other is for decompression in the case 
of a primitive type.
The new implementation (https://github.com/apache/spark/pull/15219) is 
ready for review. It would achieve 1.2x performance improvement for a 
compressed column and much performance improvement for an uncompressed 
column.

Best Regards,
Kazuaki Ishizaki



From:   Chin Wei Low <lo...@gmail.com>
To:     user@spark.apache.org
Date:   2016/10/07 13:05
Subject:        Spark SQL is slower when DataFrame is cache in Memory



Hi,

I am using Spark 1.6.0. I have a Spark application that create and cache 
(in memory) DataFrames (around 50+, with some on single parquet file and 
some on folder with a few parquet files) with the following codes:

val df = sqlContext.read.parquet
df.persist
df.count

I union them to 3 DataFrames and register that as temp table.

Then, run the following codes:
val res = sqlContext.sql("table1 union table2 union table3")
res.collect()

The res.collect() is slower when I cache the DataFrame compare to without 
cache. e.g. 3 seconds vs 1 second

I turn on the DEBUG log and see there is a gap from the res.collect() to 
start the Spark job.

Is the extra time taken by the query planning & optimization? It does not 
show the gap when I do not cache the dataframe.

Anything I am missing here?

Regards,
Chin Wei