You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Evan Chan (JIRA)" <ji...@apache.org> on 2016/01/07 02:43:39 UTC

[jira] [Commented] (SPARK-11838) Spark SQL query fragment RDD reuse

    [ https://issues.apache.org/jira/browse/SPARK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15086649#comment-15086649 ] 

Evan Chan commented on SPARK-11838:
-----------------------------------

Based on everything that is said, it seems instead of having SparkPlan compute an RDD[InternalRow], for fragments detected to have been "cached" it would have an alternate SparkPlan that can just return the previously computed RDD[InternalRow] (or UnsafeRow)?

Even if substitution into the LogicalPlan was done, it seems you would need some Strategy to parse the different operators generated by the substitution (i.e., for in memory cache, there needs to be the cacheManager substitution, then the InMemoryStrategy which returns an InMemoryColumnarScan.  Actually this could be optimized, i think, to just substitution of an InMemoryDataSource by the cacheManager, so that a strategy is not needed)

> Spark SQL query fragment RDD reuse
> ----------------------------------
>
>                 Key: SPARK-11838
>                 URL: https://issues.apache.org/jira/browse/SPARK-11838
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Mikhail Bautin
>
> With many analytical Spark SQL workloads against slowly changing tables, successive queries frequently share fragments that produce the same result. Instead of re-computing those fragments for every query, it makes sense to detect similar fragments and substitute RDDs previously created for matching SparkPlan fragments into every new SparkPlan at the execution time whenever possible. Even if no RDDs are persist()-ed to memory/disk/off-heap memory, many stages can still be skipped due to map output files being present on executor nodes.
> The implementation involves the following steps:
> (1) Logical plan "canonicalization". 
> Logical plans mapping to the same "canonical" logical plan should always produce the same results (except for possible output column reordering), although the inverse statement won't always be true. 
>   - Re-mapping expression ids to "canonical expression ids" (successively increasing numbers always starting with 1).
>   - Eliminating alias names that are unimportant after analysis completion. Only the names that are necessary to determine the Hive table columns to be scanned are retained.
>   - Reordering columns in projections, grouping/aggregation expressions, etc. This can be done e.g. by using the string representation as a sort key. Union inputs always have to be reordered the same way.
>   - Tree traversal has to happen starting from leaves and progressing towards the root, because we need to already have identified canonical expression ids for children of a node before we can come up with sort keys that would allow to reorder expressions in a node deterministically. This is a bit more complicated for Union nodes.
>   - Special handling for MetastoreRelations. We replace MetastoreRelation with a special class CanonicalMetastoreRelation that uses attributes and partitionKeys as part of its equals() and hashCode() implementation, but the visible attributes and aprtitionKeys are restricted to expression ids that the rest of the query actually needs from that MetastoreRelation.
> An example of logical plans and corresponding canonical logical plans: https://gist.githubusercontent.com/mbautin/ef1317b341211d9606cf/raw
> (2) Tracking LogicalPlan fragments corresponding to SparkPlan fragments. When generating a SparkPlan, we keep an optional reference to a LogicalPlan instance in every node. This allows us to populate the cache with mappings from canonical logical plans of query fragments to the corresponding RDDs generated as part of query execution. Note that there is no new work necessary to generate the RDDs, we are merely utilizing the RDDs that would have been produced as part of SparkPlan execution anyway.
> (3) SparkPlan fragment substitution. After generating a SparkPlan and before calling prepare() or execute() on it, we check if any of its nodes have an associated LogicalPlan that maps to a canonical logical plan matching a cache entry. If so, we substitute a PhysicalRDD (or a new class UnsafePhysicalRDD wrapping an RDD of UnsafeRow) scanning the previously created RDD instead of the current query fragment. If the expected column order differs from what the current SparkPlan fragment produces, we add a projection to reorder the columns. We also add safe/unsafe row conversions as necessary to match the row type that is expected by the parent of the current SparkPlan fragment.
> (4) The execute() method of SparkPlan also needs to perform the cache lookup and substitution described above before producing a new RDD for the current SparkPlan node. The "loading cache" pattern (e.g. as implemented in Guava) allows to reuse query fragments between simultaneously submitted queries: whichever query runs execute() for a particular fragment's canonical logical plan starts producing an RDD first, and if another query has a fragment with the same canonical logical plan, it waits for the RDD to be produced by the first query and inserts it in its SparkPlan instead.
> This kind of query fragment caching will mostly be useful for slowly-changing or static tables. Even with slowly-changing tables, the cache needs to be invalidated when those data set changes take place. One of the following approaches could be used:
> - Application logic could explicitly invalidate the cache when it detects a change
> - We could add a key that encodes the set of files in HDFS present at the moment of LogicalPlan creation to CanonicalMetastoreRelation
> - We could append version numbers to table names that are increased whenever a table is updated. This version number stays in the LogicalPlan but gets removed before doing a Hive table lookup. It could also be used to filter the set of files to scan from the Hive table.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org