You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Cheng Lian (JIRA)" <ji...@apache.org> on 2014/11/02 16:41:34 UTC

[jira] [Resolved] (SPARK-3914) InMemoryRelation should inherit statistics of its child to enable broadcast join

     [ https://issues.apache.org/jira/browse/SPARK-3914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Cheng Lian resolved SPARK-3914.
-------------------------------
    Resolution: Fixed

> InMemoryRelation should inherit statistics of its child to enable broadcast join
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-3914
>                 URL: https://issues.apache.org/jira/browse/SPARK-3914
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.1.0
>            Reporter: Cheng Lian
>            Assignee: Cheng Lian
>
> When a table/query is cached, {{InMemoryRelation}} stores the physical plan rather than the logical plan of the original table/query, thus loses the statistics information and disables broadcast join optimization.
> Sample {{spark-shell}} session to reproduce this issue:
> {code}
> val sparkContext = sc
> import org.apache.spark.sql._
> import sparkContext._
> val sqlContext = new SQLContext(sparkContext)
> import sqlContext._
> case class Sale(year: Int)
> makeRDD((1 to 100).map(Sale(_))).registerTempTable("sales")
> sql("select distinct year from sales limit 10").registerTempTable("tinyTable")
> cacheTable("tinyTable")
> sql("select * from sales join tinyTable on sales.year = tinyTable.year").queryExecution.executedPlan
> ...
> res3: org.apache.spark.sql.execution.SparkPlan =
> Project [year#4,year#5]
>  ShuffledHashJoin [year#4], [year#5], BuildRight
>   Exchange (HashPartitioning [year#4], 200)
>    PhysicalRDD [year#4], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:37
>   Exchange (HashPartitioning [year#5], 200)
>    InMemoryColumnarTableScan [year#5], [], (InMemoryRelation [year#5], false, 1000, StorageLevel(true, true, false, true, 1), (Limit 10))
> {code}
> A workaround for this is to add a {{LIMIT}} operator above the {{InMemoryColumnarTableScan}} operator:
> {code}
> sql("select * from sales join (select * from tinyTable limit 10) tiny on sales.year = tiny.year").queryExecution.executedPlan
> ...
> res8: org.apache.spark.sql.execution.SparkPlan =
> Project [year#12,year#13]
>  BroadcastHashJoin [year#12], [year#13], BuildRight
>   PhysicalRDD [year#12], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:37
>   Limit 10
>    InMemoryColumnarTableScan [year#13], [], (InMemoryRelation [year#13], false, 1000, StorageLevel(true, true, false, true, 1), (Limit 10))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org