You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Cheng Lian (JIRA)" <ji...@apache.org> on 2014/11/02 16:41:34 UTC
[jira] [Resolved] (SPARK-3914) InMemoryRelation should inherit
statistics of its child to enable broadcast join
[ https://issues.apache.org/jira/browse/SPARK-3914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Cheng Lian resolved SPARK-3914.
-------------------------------
Resolution: Fixed
> InMemoryRelation should inherit statistics of its child to enable broadcast join
> --------------------------------------------------------------------------------
>
> Key: SPARK-3914
> URL: https://issues.apache.org/jira/browse/SPARK-3914
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.1.0
> Reporter: Cheng Lian
> Assignee: Cheng Lian
>
> When a table/query is cached, {{InMemoryRelation}} stores the physical plan rather than the logical plan of the original table/query, thus loses the statistics information and disables broadcast join optimization.
> Sample {{spark-shell}} session to reproduce this issue:
> {code}
> val sparkContext = sc
> import org.apache.spark.sql._
> import sparkContext._
> val sqlContext = new SQLContext(sparkContext)
> import sqlContext._
> case class Sale(year: Int)
> makeRDD((1 to 100).map(Sale(_))).registerTempTable("sales")
> sql("select distinct year from sales limit 10").registerTempTable("tinyTable")
> cacheTable("tinyTable")
> sql("select * from sales join tinyTable on sales.year = tinyTable.year").queryExecution.executedPlan
> ...
> res3: org.apache.spark.sql.execution.SparkPlan =
> Project [year#4,year#5]
> ShuffledHashJoin [year#4], [year#5], BuildRight
> Exchange (HashPartitioning [year#4], 200)
> PhysicalRDD [year#4], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:37
> Exchange (HashPartitioning [year#5], 200)
> InMemoryColumnarTableScan [year#5], [], (InMemoryRelation [year#5], false, 1000, StorageLevel(true, true, false, true, 1), (Limit 10))
> {code}
> A workaround for this is to add a {{LIMIT}} operator above the {{InMemoryColumnarTableScan}} operator:
> {code}
> sql("select * from sales join (select * from tinyTable limit 10) tiny on sales.year = tiny.year").queryExecution.executedPlan
> ...
> res8: org.apache.spark.sql.execution.SparkPlan =
> Project [year#12,year#13]
> BroadcastHashJoin [year#12], [year#13], BuildRight
> PhysicalRDD [year#12], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:37
> Limit 10
> InMemoryColumnarTableScan [year#13], [], (InMemoryRelation [year#13], false, 1000, StorageLevel(true, true, false, true, 1), (Limit 10))
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org