You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by twinkle sachdeva <tw...@gmail.com> on 2014/09/25 07:18:51 UTC
Using one sql query's result inside another sql query
Hi,
I am using Hive Context to fire the sql queries inside spark. I have
created a schemaRDD( Let's call it cachedSchema ) inside my code.
If i fire a sql query ( Query 1 ) on top of it, then it works.
But if I refer to Query1's result inside another sql, that fails. Note that
I have already registered Query1's result as temp table.
registerTempTable(cachedSchema)
Queryresult1 = Query1 using cachedSchema [ works ]
registerTempTable(Queryresult1)
Queryresult2 = Query2 using Queryresult1 [ FAILS ]
Is it expected?? Any known work around?
Following is the exception I am receiving :
*org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
attributes: 'f1,'f2,'f3,'f4, tree:*
*Project ['f1,'f2,'f3,'f4]*
* Filter ('count > 3)*
* LowerCaseSchema *
* Subquery x*
* Project ['F1,'F2,'F3,'F4,'F6,'Count]*
* LowerCaseSchema *
* Subquery src*
* SparkLogicalPlan (ExistingRdd
[F1#0,F2#1,F3#2,F4#3,F5#4,F6#5,Count#6], MappedRDD[4] at map at
SQLBlock.scala:64)*
* at
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72)*
* at
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70)*
* at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)*
* at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)*
* at
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70)*
* at
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68)*
* at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)*
* at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)*
* at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
* at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)*
* at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)*
* at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)*
* at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)*
* at scala.collection.immutable.List.foreach(List.scala:318)*
* at
org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)*
* at
org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:397)*
* at
org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:397)*
* at
org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:358)*
* at
org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:357)*
* at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)*
* at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)*
* at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)*
* at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)*
* at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360)*
* at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360)*
* at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120)*
* at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)*
* at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:189)*
Re: Using one sql query's result inside another sql query
Posted by Cheng Lian <li...@gmail.com>.
This workaround looks good to me. In this way, all queries are still
executed lazily within a single DAG, and Spark SQL is capable to
optimize the query plan as a whole.
On 9/29/14 11:26 AM, twinkle sachdeva wrote:
> Thanks Cheng.
>
> For the time being , As a work around, I had applied the schema
> to Queryresult1, and then registered the result as temp table.
> Although that works, but I was not sure of performance impact, as that
> might block some optimisation in some scenarios.
>
> This flow (on spark 1.1 ) works:
>
> registerTempTable(cachedSchema)
> Queryresult1 = Query1 using cachedSchema [ works ]
>
> *queryResult1withSchema = hiveContext.applySchema( Queryresult1,
> Queryresult1.schema )*
> registerTempTable(*queryResult1withSchema*)
>
> Queryresult2 = Query2 using *queryResult1withSchema* [ *works* ]
>
>
> On Fri, Sep 26, 2014 at 5:13 PM, Cheng Lian <lian.cs.zju@gmail.com
> <ma...@gmail.com>> wrote:
>
> H Twinkle,
>
> The failure is caused by case sensitivity. The temp table actually
> stores the original un-analyzed logical plan, thus field names
> remain capital (F1, F2, etc.). I believe this issue has already
> been fixed by PR #2382
> <https://github.com/apache/spark/pull/2382>. As a workaround, you
> can use lowercase letters in field names instead.
>
> Cheng
>
> On 9/25/14 1:18 PM, twinkle sachdeva wrote:
>
>> Hi,
>>
>> I am using Hive Context to fire the sql queries inside spark. I
>> have created a schemaRDD( Let's call it cachedSchema ) inside my
>> code.
>> If i fire a sql query ( Query 1 ) on top of it, then it works.
>>
>> But if I refer to Query1's result inside another sql, that fails.
>> Note that I have already registered Query1's result as temp table.
>>
>> registerTempTable(cachedSchema)
>> Queryresult1 = Query1 using cachedSchema [ works ]
>> registerTempTable(Queryresult1)
>>
>> Queryresult2 = Query2 using Queryresult1 [ FAILS ]
>>
>> Is it expected?? Any known work around?
>>
>> Following is the exception I am receiving :
>>
>>
>> *org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
>> Unresolved attributes: 'f1,'f2,'f3,'f4, tree:*
>>
>> *Project ['f1,'f2,'f3,'f4]*
>>
>> * Filter ('count > 3)*
>>
>> * LowerCaseSchema *
>>
>> * Subquery x*
>>
>> * Project ['F1,'F2,'F3,'F4,'F6,'Count]*
>>
>> * LowerCaseSchema *
>>
>> * Subquery src*
>>
>> * SparkLogicalPlan (ExistingRdd
>> [F1#0,F2#1,F3#2,F4#3,F5#4,F6#5,Count#6], MappedRDD[4] at map at
>> SQLBlock.scala:64)*
>>
>>
>> *at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$anonfun$apply$1.applyOrElse(Analyzer.scala:72)*
>>
>> *at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$anonfun$apply$1.applyOrElse(Analyzer.scala:70)*
>>
>> *at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)*
>>
>> *at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)*
>>
>> *at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70)*
>>
>> *at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68)*
>>
>> *at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:61)*
>>
>> *at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:59)*
>>
>> *at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
>>
>> *at
>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)*
>>
>> *at
>> scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)*
>>
>> *at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:59)*
>>
>> *at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:51)*
>>
>> *at scala.collection.immutable.List.foreach(List.scala:318)*
>>
>> *at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)*
>>
>> *at
>> org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:397)*
>>
>> *at
>> org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:397)*
>>
>> *at
>> org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:358)*
>>
>> *at
>> org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:357)*
>>
>> *at
>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)*
>>
>> *at
>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)*
>>
>> *at
>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)*
>>
>> *at
>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)*
>>
>> *at
>> org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360)*
>>
>> *at
>> org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360)*
>>
>> *at
>> org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120)*
>>
>> *at
>> org.apache.spark.rdd.RDD$anonfun$dependencies$2.apply(RDD.scala:191)*
>>
>> *at
>> org.apache.spark.rdd.RDD$anonfun$dependencies$2.apply(RDD.scala:189)*
>>
>
>
>
Re: Using one sql query's result inside another sql query
Posted by twinkle sachdeva <tw...@gmail.com>.
Thanks Cheng.
For the time being , As a work around, I had applied the schema
to Queryresult1, and then registered the result as temp table. Although
that works, but I was not sure of performance impact, as that might block
some optimisation in some scenarios.
This flow (on spark 1.1 ) works:
registerTempTable(cachedSchema)
Queryresult1 = Query1 using cachedSchema [ works ]
*queryResult1withSchema
= hiveContext.applySchema( Queryresult1, Queryresult1.schema )*
registerTempTable(*queryResult1withSchema*)
Queryresult2 = Query2 using *queryResult1withSchema* [ *works* ]
On Fri, Sep 26, 2014 at 5:13 PM, Cheng Lian <li...@gmail.com> wrote:
> H Twinkle,
>
> The failure is caused by case sensitivity. The temp table actually stores
> the original un-analyzed logical plan, thus field names remain capital (F1,
> F2, etc.). I believe this issue has already been fixed by PR #2382
> <https://github.com/apache/spark/pull/2382>. As a workaround, you can use
> lowercase letters in field names instead.
>
> Cheng
>
> On 9/25/14 1:18 PM, twinkle sachdeva wrote:
>
> Hi,
>
> I am using Hive Context to fire the sql queries inside spark. I have
> created a schemaRDD( Let's call it cachedSchema ) inside my code.
> If i fire a sql query ( Query 1 ) on top of it, then it works.
>
> But if I refer to Query1's result inside another sql, that fails. Note
> that I have already registered Query1's result as temp table.
>
> registerTempTable(cachedSchema)
> Queryresult1 = Query1 using cachedSchema [ works ]
> registerTempTable(Queryresult1)
>
> Queryresult2 = Query2 using Queryresult1 [ FAILS ]
>
> Is it expected?? Any known work around?
>
> Following is the exception I am receiving :
>
>
> *org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
> Unresolved attributes: 'f1,'f2,'f3,'f4, tree:*
>
> *Project ['f1,'f2,'f3,'f4]*
>
> * Filter ('count > 3)*
>
> * LowerCaseSchema *
>
> * Subquery x*
>
> * Project ['F1,'F2,'F3,'F4,'F6,'Count]*
>
> * LowerCaseSchema *
>
> * Subquery src*
>
> * SparkLogicalPlan (ExistingRdd
> [F1#0,F2#1,F3#2,F4#3,F5#4,F6#5,Count#6], MappedRDD[4] at map at
> SQLBlock.scala:64)*
>
>
> * at
> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$anonfun$apply$1.applyOrElse(Analyzer.scala:72)*
>
> * at
> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$anonfun$apply$1.applyOrElse(Analyzer.scala:70)*
>
> * at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)*
>
> * at
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)*
>
> * at
> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70)*
>
> * at
> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68)*
>
> * at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:61)*
>
> * at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:59)*
>
> * at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
>
> * at
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)*
>
> * at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)*
>
> * at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:59)*
>
> * at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:51)*
>
> * at scala.collection.immutable.List.foreach(List.scala:318)*
>
> * at
> org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)*
>
> * at
> org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:397)*
>
> * at
> org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:397)*
>
> * at
> org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:358)*
>
> * at
> org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:357)*
>
> * at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)*
>
> * at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)*
>
> * at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)*
>
> * at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)*
>
> * at
> org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360)*
>
> * at
> org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360)*
>
> * at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120)*
>
> * at org.apache.spark.rdd.RDD$anonfun$dependencies$2.apply(RDD.scala:191)*
>
> * at org.apache.spark.rdd.RDD$anonfun$dependencies$2.apply(RDD.scala:189)*
>
>
>
Re: Using one sql query's result inside another sql query
Posted by Cheng Lian <li...@gmail.com>.
H Twinkle,
The failure is caused by case sensitivity. The temp table actually
stores the original un-analyzed logical plan, thus field names remain
capital (F1, F2, etc.). I believe this issue has already been fixed by
PR #2382 <https://github.com/apache/spark/pull/2382>. As a workaround,
you can use lowercase letters in field names instead.
Cheng
On 9/25/14 1:18 PM, twinkle sachdeva wrote:
> Hi,
>
> I am using Hive Context to fire the sql queries inside spark. I have
> created a schemaRDD( Let's call it cachedSchema ) inside my code.
> If i fire a sql query ( Query 1 ) on top of it, then it works.
>
> But if I refer to Query1's result inside another sql, that fails. Note
> that I have already registered Query1's result as temp table.
>
> registerTempTable(cachedSchema)
> Queryresult1 = Query1 using cachedSchema [ works ]
> registerTempTable(Queryresult1)
>
> Queryresult2 = Query2 using Queryresult1 [ FAILS ]
>
> Is it expected?? Any known work around?
>
> Following is the exception I am receiving :
>
>
> *org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
> Unresolved attributes: 'f1,'f2,'f3,'f4, tree:*
>
> *Project ['f1,'f2,'f3,'f4]*
>
> * Filter ('count > 3)*
>
> * LowerCaseSchema *
>
> * Subquery x*
>
> * Project ['F1,'F2,'F3,'F4,'F6,'Count]*
>
> * LowerCaseSchema *
>
> * Subquery src*
>
> * SparkLogicalPlan (ExistingRdd
> [F1#0,F2#1,F3#2,F4#3,F5#4,F6#5,Count#6], MappedRDD[4] at map at
> SQLBlock.scala:64)*
>
>
> *at
> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$anonfun$apply$1.applyOrElse(Analyzer.scala:72)*
>
> *at
> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$anonfun$apply$1.applyOrElse(Analyzer.scala:70)*
>
> *at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)*
>
> *at
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)*
>
> *at
> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70)*
>
> *at
> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68)*
>
> *at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:61)*
>
> *at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:59)*
>
> *at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
>
> *at
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)*
>
> *at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)*
>
> *at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:59)*
>
> *at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:51)*
>
> *at scala.collection.immutable.List.foreach(List.scala:318)*
>
> *at
> org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)*
>
> *at
> org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:397)*
>
> *at
> org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:397)*
>
> *at
> org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:358)*
>
> *at
> org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:357)*
>
> *at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)*
>
> *at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)*
>
> *at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)*
>
> *at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)*
>
> *at
> org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360)*
>
> *at
> org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360)*
>
> *at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120)*
>
> *at org.apache.spark.rdd.RDD$anonfun$dependencies$2.apply(RDD.scala:191)*
>
> *at org.apache.spark.rdd.RDD$anonfun$dependencies$2.apply(RDD.scala:189)*
>