You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Jacek Laskowski <ja...@japila.pl> on 2016/11/19 20:19:07 UTC

Analyzing and reusing cached Datasets

Hi,

There might be a bug in how analyzing Datasets or looking up cached
Datasets works. I'm on master.

➜  spark git:(master) ✗ ./bin/spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-SNAPSHOT
      /_/

Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_112
Branch master
Compiled by user jacek on 2016-11-19T08:39:43Z
Revision 2a40de408b5eb47edba92f9fe92a42ed1e78bf98
Url https://github.com/apache/spark.git
Type --help for more information.

After reviewing CacheManager and how caching works for Datasets I
thought the following query would use the cached Dataset but it does
not.

// Cache Dataset -- it is lazy
scala> val df = spark.range(1).cache
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]

// Trigger caching
scala> df.show
+---+
| id|
+---+
|  0|
+---+

// Visit http://localhost:4040/storage to see the Dataset cached. And it is.

// Use the cached Dataset in another query
// Notice InMemoryRelation in use for cached queries
// It works as expected.
scala> df.withColumn("newId", 'id).explain(extended = true)
== Parsed Logical Plan ==
'Project [*, 'id AS newId#16]
+- Range (0, 1, step=1, splits=Some(8))

== Analyzed Logical Plan ==
id: bigint, newId: bigint
Project [id#0L, id#0L AS newId#16L]
+- Range (0, 1, step=1, splits=Some(8))

== Optimized Logical Plan ==
Project [id#0L, id#0L AS newId#16L]
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory,
deserialized, 1 replicas)
      +- *Range (0, 1, step=1, splits=Some(8))

== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk,
memory, deserialized, 1 replicas)
            +- *Range (0, 1, step=1, splits=Some(8))

I hoped that the following query would use the cached one but it does
not. Should it? I thought that QueryExecution.withCachedData [1] would
do the trick.

[1] https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L70

// The following snippet uses spark.range(1) which is the same as the
one cached above
// Why does the physical plan not use InMemoryTableScan and InMemoryRelation?
scala> spark.range(1).withColumn("new", 'id).explain(extended = true)
== Parsed Logical Plan ==
'Project [*, 'id AS new#29]
+- Range (0, 1, step=1, splits=Some(8))

== Analyzed Logical Plan ==
id: bigint, new: bigint
Project [id#26L, id#26L AS new#29L]
+- Range (0, 1, step=1, splits=Some(8))

== Optimized Logical Plan ==
Project [id#26L, id#26L AS new#29L]
+- Range (0, 1, step=1, splits=Some(8))

== Physical Plan ==
*Project [id#26L, id#26L AS new#29L]
+- *Range (0, 1, step=1, splits=Some(8))

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Analyzing and reusing cached Datasets

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi Michael,

Thanks a lot for your prompt answer. I greatly appreciate it.

Having said that, I think we might be...cough...cough...wrong :)

I think the "issue" is in QueryPlan.sameResult [1] as its scaladoc says:

   * Since its likely undecidable to generally determine if two given
plans will produce the same
   * results, it is okay for this function to return false, even if
the results are actually
   * the same.  Such behavior will not affect correctness, only the
application of performance
   * enhancements like caching.  However, it is not acceptable to
return true if the results could
   * possibly be different.

   * By default this function performs a modified version of equality
that is tolerant of cosmetic
   * differences like attribute naming and or expression id
differences. Operators that
   * can do better should override this function.

[1] https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala#L370

I don't think there is anything in Analyzer or SparkOptimizer to
prevent the cached optimization.

What do you think about:

1. Adding few TRACE messages in sameResult? (I'm doing it anyway to
hunt down the "issue")?
2. Defining an override for sameResult in Range (as LocalRelation and
other logical operators)?

Somehow I feel Spark could do better. Please guide (and help me get
better at this low-level infra of Spark SQL). Thanks!

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Nov 20, 2016 at 3:52 AM, Michael Armbrust
<mi...@databricks.com> wrote:
> You are hitting a weird optimization in withColumn.  Specifically, to avoid
> building up huge trees with chained calls to this method, we collapse
> projections eagerly (instead of waiting for the optimizer).
>
> Typically we look for cached data in between analysis and optimization, so
> that optimizations won't change out ability to recognized cached query
> plans.  However, in this case the eager optimization is thwarting that.
>
> On Sat, Nov 19, 2016 at 12:19 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>> Hi,
>>
>> There might be a bug in how analyzing Datasets or looking up cached
>> Datasets works. I'm on master.
>>
>> ➜  spark git:(master) ✗ ./bin/spark-submit --version
>> Welcome to
>>       ____              __
>>      / __/__  ___ _____/ /__
>>     _\ \/ _ \/ _ `/ __/  '_/
>>    /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-SNAPSHOT
>>       /_/
>>
>> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_112
>> Branch master
>> Compiled by user jacek on 2016-11-19T08:39:43Z
>> Revision 2a40de408b5eb47edba92f9fe92a42ed1e78bf98
>> Url https://github.com/apache/spark.git
>> Type --help for more information.
>>
>> After reviewing CacheManager and how caching works for Datasets I
>> thought the following query would use the cached Dataset but it does
>> not.
>>
>> // Cache Dataset -- it is lazy
>> scala> val df = spark.range(1).cache
>> df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
>>
>> // Trigger caching
>> scala> df.show
>> +---+
>> | id|
>> +---+
>> |  0|
>> +---+
>>
>> // Visit http://localhost:4040/storage to see the Dataset cached. And it
>> is.
>>
>> // Use the cached Dataset in another query
>> // Notice InMemoryRelation in use for cached queries
>> // It works as expected.
>> scala> df.withColumn("newId", 'id).explain(extended = true)
>> == Parsed Logical Plan ==
>> 'Project [*, 'id AS newId#16]
>> +- Range (0, 1, step=1, splits=Some(8))
>>
>> == Analyzed Logical Plan ==
>> id: bigint, newId: bigint
>> Project [id#0L, id#0L AS newId#16L]
>> +- Range (0, 1, step=1, splits=Some(8))
>>
>> == Optimized Logical Plan ==
>> Project [id#0L, id#0L AS newId#16L]
>> +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory,
>> deserialized, 1 replicas)
>>       +- *Range (0, 1, step=1, splits=Some(8))
>>
>> == Physical Plan ==
>> *Project [id#0L, id#0L AS newId#16L]
>> +- InMemoryTableScan [id#0L]
>>       +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk,
>> memory, deserialized, 1 replicas)
>>             +- *Range (0, 1, step=1, splits=Some(8))
>>
>> I hoped that the following query would use the cached one but it does
>> not. Should it? I thought that QueryExecution.withCachedData [1] would
>> do the trick.
>>
>> [1]
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L70
>>
>> // The following snippet uses spark.range(1) which is the same as the
>> one cached above
>> // Why does the physical plan not use InMemoryTableScan and
>> InMemoryRelation?
>> scala> spark.range(1).withColumn("new", 'id).explain(extended = true)
>> == Parsed Logical Plan ==
>> 'Project [*, 'id AS new#29]
>> +- Range (0, 1, step=1, splits=Some(8))
>>
>> == Analyzed Logical Plan ==
>> id: bigint, new: bigint
>> Project [id#26L, id#26L AS new#29L]
>> +- Range (0, 1, step=1, splits=Some(8))
>>
>> == Optimized Logical Plan ==
>> Project [id#26L, id#26L AS new#29L]
>> +- Range (0, 1, step=1, splits=Some(8))
>>
>> == Physical Plan ==
>> *Project [id#26L, id#26L AS new#29L]
>> +- *Range (0, 1, step=1, splits=Some(8))
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Analyzing and reusing cached Datasets

Posted by Michael Armbrust <mi...@databricks.com>.
You are hitting a weird optimization in withColumn.  Specifically, to avoid
building up huge trees with chained calls to this method, we collapse
projections eagerly (instead of waiting for the optimizer).

Typically we look for cached data in between analysis and optimization, so
that optimizations won't change out ability to recognized cached query
plans.  However, in this case the eager optimization is thwarting that.

On Sat, Nov 19, 2016 at 12:19 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> There might be a bug in how analyzing Datasets or looking up cached
> Datasets works. I'm on master.
>
> ➜  spark git:(master) ✗ ./bin/spark-submit --version
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-SNAPSHOT
>       /_/
>
> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_112
> Branch master
> Compiled by user jacek on 2016-11-19T08:39:43Z
> Revision 2a40de408b5eb47edba92f9fe92a42ed1e78bf98
> Url https://github.com/apache/spark.git
> Type --help for more information.
>
> After reviewing CacheManager and how caching works for Datasets I
> thought the following query would use the cached Dataset but it does
> not.
>
> // Cache Dataset -- it is lazy
> scala> val df = spark.range(1).cache
> df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
>
> // Trigger caching
> scala> df.show
> +---+
> | id|
> +---+
> |  0|
> +---+
>
> // Visit http://localhost:4040/storage to see the Dataset cached. And it
> is.
>
> // Use the cached Dataset in another query
> // Notice InMemoryRelation in use for cached queries
> // It works as expected.
> scala> df.withColumn("newId", 'id).explain(extended = true)
> == Parsed Logical Plan ==
> 'Project [*, 'id AS newId#16]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Analyzed Logical Plan ==
> id: bigint, newId: bigint
> Project [id#0L, id#0L AS newId#16L]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Optimized Logical Plan ==
> Project [id#0L, id#0L AS newId#16L]
> +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory,
> deserialized, 1 replicas)
>       +- *Range (0, 1, step=1, splits=Some(8))
>
> == Physical Plan ==
> *Project [id#0L, id#0L AS newId#16L]
> +- InMemoryTableScan [id#0L]
>       +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk,
> memory, deserialized, 1 replicas)
>             +- *Range (0, 1, step=1, splits=Some(8))
>
> I hoped that the following query would use the cached one but it does
> not. Should it? I thought that QueryExecution.withCachedData [1] would
> do the trick.
>
> [1] https://github.com/apache/spark/blob/master/sql/core/
> src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L70
>
> // The following snippet uses spark.range(1) which is the same as the
> one cached above
> // Why does the physical plan not use InMemoryTableScan and
> InMemoryRelation?
> scala> spark.range(1).withColumn("new", 'id).explain(extended = true)
> == Parsed Logical Plan ==
> 'Project [*, 'id AS new#29]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Analyzed Logical Plan ==
> id: bigint, new: bigint
> Project [id#26L, id#26L AS new#29L]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Optimized Logical Plan ==
> Project [id#26L, id#26L AS new#29L]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Physical Plan ==
> *Project [id#26L, id#26L AS new#29L]
> +- *Range (0, 1, step=1, splits=Some(8))
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>