You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yin Huai (JIRA)" <ji...@apache.org> on 2015/05/27 21:59:17 UTC
[jira] [Updated] (SPARK-6489) Optimize lateral view with explode to
not read unnecessary columns
[ https://issues.apache.org/jira/browse/SPARK-6489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yin Huai updated SPARK-6489:
----------------------------
Target Version/s: 1.5.0 (was: 1.4.0)
> Optimize lateral view with explode to not read unnecessary columns
> ------------------------------------------------------------------
>
> Key: SPARK-6489
> URL: https://issues.apache.org/jira/browse/SPARK-6489
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 1.3.0
> Reporter: Konstantin Shaposhnikov
> Labels: starter
>
> Currently a query with "lateral view explode(...)" results in an execution plan that reads all columns of the underlying RDD.
> E.g. given *ppl* table is DF created from Person case class:
> {code}
> case class Person(val name: String, val age: Int, val data: Array[Int])
> {code}
> the following SQL:
> {code}
> select name, sum(d) from ppl lateral view explode(data) d as d group by name
> {code}
> executes as follows:
> {noformat}
> == Physical Plan ==
> Aggregate false, [name#0], [name#0,SUM(PartialSum#38L) AS _c1#18L]
> Exchange (HashPartitioning [name#0], 200)
> Aggregate true, [name#0], [name#0,SUM(CAST(d#21, LongType)) AS PartialSum#38L]
> Project [name#0,d#21]
> Generate explode(data#2), true, false
> InMemoryColumnarTableScan [name#0,age#1,data#2], [], (InMemoryRelation [name#0,age#1,data#2], true, 10000, StorageLevel(true, true, false, true, 1), (PhysicalRDD [name#0,age#1,data#2], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:35), Some(ppl))
> {noformat}
> Note that *age* column is not needed to produce the output but it is still read from the underlying RDD.
> A sample program to demonstrate the issue:
> {code}
> case class Person(val name: String, val age: Int, val data: Array[Int])
> object ExplodeDemo extends App {
> val ppl = Array(
> Person("A", 20, Array(10, 12, 19)),
> Person("B", 25, Array(7, 8, 4)),
> Person("C", 19, Array(12, 4, 232)))
>
> val conf = new SparkConf().setMaster("local[2]").setAppName("sql")
> val sc = new SparkContext(conf)
> val sqlCtx = new HiveContext(sc)
> import sqlCtx.implicits._
> val df = sc.makeRDD(ppl).toDF
> df.registerTempTable("ppl")
> sqlCtx.cacheTable("ppl") // cache table otherwise ExistingRDD will be used that do not support column pruning
> val s = sqlCtx.sql("select name, sum(d) from ppl lateral view explode(data) d as d group by name")
> s.explain(true)
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org