You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Corey Nolet <cj...@gmail.com> on 2014/11/07 02:36:14 UTC

Re: Selecting Based on Nested Values using Language Integrated Query Syntax

Michael,

Thanks for the explanation. I was able to get this running.

On Wed, Oct 29, 2014 at 3:07 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> We are working on more helpful error messages, but in the meantime let me
> explain how to read this output.
>
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
> attributes: 'p.name,'p.age, tree:
>
> Project ['p.name,'p.age]
>  Filter ('location.number = 2300)
>   Join Inner, Some((location#110.number AS number#111 = 'ln.streetnumber))
>    Generate explode(locations#10), true, false, Some(l)
>     LowerCaseSchema
>      Subquery p
>       Subquery people
>        SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11],
> MappedRDD[28] at map at JsonRDD.scala:38)
>    LowerCaseSchema
>     Subquery ln
>      Subquery locationNames
>       SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81],
> MappedRDD[99] at map at JsonRDD.scala:38)
>
> 'tickedFields indicate a failure to resolve, where as numbered#10
> attributes have been resolved. (The numbers are globally unique and can be
> used to disambiguate where a column is coming from when the names are the
> same)
>
> Resolution happens bottom up.  So the first place that there is a problem
> is 'ln.streetnumber, which prevents the rest of the query from resolving.
> If you look at the subquery ln, it is only producing two columns:
> locationName and locationNumber. So "streetnumber" is not valid.
>
>
> On Tue, Oct 28, 2014 at 8:02 PM, Corey Nolet <cj...@gmail.com> wrote:
>
>> scala> locations.queryExecution
>>
>> warning: there were 1 feature warning(s); re-run with -feature for details
>>
>> res28: _4.sqlContext.QueryExecution forSome { val _4:
>> org.apache.spark.sql.SchemaRDD } =
>>
>> == Parsed Logical Plan ==
>>
>> SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81],
>> MappedRDD[99] at map at JsonRDD.scala:38)
>>
>>
>> == Analyzed Logical Plan ==
>>
>> SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81],
>> MappedRDD[99] at map at JsonRDD.scala:38)
>>
>>
>> == Optimized Logical Plan ==
>>
>> SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81],
>> MappedRDD[99] at map at JsonRDD.scala:38)
>>
>>
>> == Physical Plan ==
>>
>> ExistingRdd [locationName#80,locationNumber#81], MappedRDD[99] at map at
>> JsonRDD.scala:38
>>
>>
>> Code Generation: false
>>
>> == RDD ==
>>
>>
>> scala> people.queryExecution
>>
>> warning: there were 1 feature warning(s); re-run with -feature for details
>>
>> res29: _5.sqlContext.QueryExecution forSome { val _5:
>> org.apache.spark.sql.SchemaRDD } =
>>
>> == Parsed Logical Plan ==
>>
>> SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11], MappedRDD[28]
>> at map at JsonRDD.scala:38)
>>
>>
>> == Analyzed Logical Plan ==
>>
>> SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11], MappedRDD[28]
>> at map at JsonRDD.scala:38)
>>
>>
>> == Optimized Logical Plan ==
>>
>> SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11], MappedRDD[28]
>> at map at JsonRDD.scala:38)
>>
>>
>> == Physical Plan ==
>>
>> ExistingRdd [age#9,locations#10,name#11], MappedRDD[28] at map at
>> JsonRDD.scala:38
>>
>>
>> Code Generation: false
>>
>> == RDD ==
>>
>>
>>
>> Here's when I try executing the join and the lateral view explode() :
>>
>>
>> 14/10/28 23:05:35 INFO ParseDriver: Parse Completed
>>
>> org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
>> Unresolved attributes: 'p.name,'p.age, tree:
>>
>> Project ['p.name,'p.age]
>>
>>  Filter ('location.number = 2300)
>>
>>   Join Inner, Some((location#110.number AS number#111 = 'ln.streetnumber))
>>
>>    Generate explode(locations#10), true, false, Some(l)
>>
>>     LowerCaseSchema
>>
>>      Subquery p
>>
>>       Subquery people
>>
>>        SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11],
>> MappedRDD[28] at map at JsonRDD.scala:38)
>>
>>    LowerCaseSchema
>>
>>     Subquery ln
>>
>>      Subquery locationNames
>>
>>       SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81],
>> MappedRDD[99] at map at JsonRDD.scala:38)
>>
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72)
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70)
>>
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)
>>
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70)
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68)
>>
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
>>
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
>>
>> at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>
>> at
>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
>>
>> at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
>>
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
>>
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
>>
>> at scala.collection.immutable.List.foreach(List.scala:318)
>>
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
>>
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:397)
>>
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:397)
>>
>> at
>> org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:358)
>>
>> at
>> org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:357)
>>
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
>>
>>  at
>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
>>
>> On Tue, Oct 28, 2014 at 10:48 PM, Michael Armbrust <
>> michael@databricks.com> wrote:
>>
>>> Can you println the .queryExecution of the SchemaRDD?
>>>
>>> On Tue, Oct 28, 2014 at 7:43 PM, Corey Nolet <cj...@gmail.com> wrote:
>>>
>>>> So this appears to work just fine:
>>>>
>>>> hctx.sql("SELECT p.name, p.age  FROM people p LATERAL VIEW
>>>> explode(locations) l AS location JOIN location5 lo ON l.number =
>>>> lo.streetNumber WHERE location.number = '2300'").collect()
>>>>
>>>> But as soon as I try to join with another set based on a property from
>>>> the exploded locations set, I get invalid attribute exceptions:
>>>>
>>>> hctx.sql("SELECT p.name, p.age, ln.locationName  FROM people as p
>>>> LATERAL VIEW explode(locations) l AS location JOIN locationNames ln ON
>>>> location.number = ln.streetNumber WHERE location.number = '2300'").collect()
>>>>
>>>>
>>>> On Tue, Oct 28, 2014 at 10:19 PM, Michael Armbrust <
>>>> michael@databricks.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 28, 2014 at 6:56 PM, Corey Nolet <cj...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Am I able to do a join on an exploded field?
>>>>>>
>>>>>> Like if I have another object:
>>>>>>
>>>>>> { "streetNumber":"2300", "locationName":"The Big Building"} and I
>>>>>> want to join with the previous json by the locations[].number field- is
>>>>>> that possible?
>>>>>>
>>>>>
>>>>> I'm not sure I fully understand the question, but once its exploded
>>>>> its a normal tuple and you can do any operations on it.  The explode is
>>>>> just producing a new row for each element in the array.
>>>>>
>>>>> Awesome, this is what I was looking for. So it's possible to use hive
>>>>>>> dialect in a regular sql context? This is what was confusing to me- the
>>>>>>> docs kind of allude to it but don't directly point it out.
>>>>>>>
>>>>>>
>>>>> No, you need a HiveContext as we use the actual hive parser
>>>>> (SQLContext only exists as a separate entity so that people who don't want
>>>>> Hive's dependencies in their app can still use a limited subset of Spark
>>>>> SQL).
>>>>>
>>>>
>>>>
>>>
>>
>