You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Armbrust (JIRA)" <ji...@apache.org> on 2015/08/04 03:34:04 UTC

[jira] [Updated] (SPARK-8466) Bug in SQL Optimizer: Unresolved Attribute after pushing Filter below Project

     [ https://issues.apache.org/jira/browse/SPARK-8466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Michael Armbrust updated SPARK-8466:
------------------------------------
    Description: 
Input Data: a parquet file stored in hdfs://xxxx/data with two columns (lifeAverageBitrateKbps int, playtimems int)

=================================
Scripts used in spark-shell:
{code}
val df = sqlContext.parquetFile("hdfs://xxxx/data")
import org.apache.spark.sql.types._
val cols = df.schema.fields.map { f =>
  val dataType = f.dataType match {
    case DoubleType | FloatType => DecimalType.Unlimited
    case t => t
  }
  df.col(f.name).cast(dataType).as(f.name)
}
df.select(cols: _*).registerTempTable("t")

val query = 
"""
|SELECT avg(cleanedplaytimems),
|       count(1)
|FROM
|  (SELECT 0 key,
|          avg(lifeAverageBitrateKbps) avgbitrate
|   FROM anon_sdm2_ss
|   WHERE lifeAverageBitrateKbps > 0) t1,
|  (SELECT 0 key,
|          lifeAverageBitrateKbps,
|          if(playtimems > 0, playtimems, 0) cleanedplaytimems
|   FROM anon_sdm2_ss
|   WHERE lifeAverageBitrateKbps > 0) t2
|WHERE t1.key=t2.key
|  AND t2.lifeAverageBitrateKbps < 0.5 * t1.avgbitrate
""".stripMargin

sqlContext.sql(query).explain(true)
{code}
===========================
Output:

{code}
== Analyzed Logical Plan ==
Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1) AS _c1#112L]
 Filter ((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, DoubleType) < (0.5 * avgbitrate#108)))
  Join Inner, None
   Subquery t1
    Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, LongType)) AS avgbitrate#108]
     Filter (lifeAverageBitrateKbps#105 > 0)
      Subquery anon_sdm2_ss
       Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106]
        Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
   Subquery t2
    Project [0 AS key#109,lifeAverageBitrateKbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#106 > 0),playtimems#106,0) AS cleanedplaytimems#110]
     Filter (lifeAverageBitrateKbps#105 > 0)
      Subquery anon_sdm2_ss
       Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106]
        Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)

== Optimized Logical Plan ==
Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1) AS _c1#112L]
 Project [cleanedplaytimems#110]
  Join Inner, Some(((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, DoubleType) < (0.5 * avgbitrate#108))))
   Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, LongType)) AS avgbitrate#108]
    Project [lifeaveragebitratekbps#27 AS lifeaveragebitratekbps#105]
     !Filter (lifeAverageBitrateKbps#105 > 0)
      Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
   Project [0 AS key#109,lifeaveragebitratekbps#27 AS lifeaveragebitratekbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#89 AS playtimems#106 > 0),playtimems#89 AS playtimems#106,0) AS cleanedplaytimems#110]
    !Filter (lifeAverageBitrateKbps#105 > 0)
     Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
{code}

Note:
Filter is unresolved




  was:
Input Data: a parquet file stored in hdfs://xxxx/data with two columns (lifeAverageBitrateKbps int, playtimems int)

=================================
Scripts used in spark-shell:

val df = sqlContext.parquetFile("hdfs://xxxx/data")
import org.apache.spark.sql.types._
val cols = df.schema.fields.map { f =>
  val dataType = f.dataType match {
    case DoubleType | FloatType => DecimalType.Unlimited
    case t => t
  }
  df.col(f.name).cast(dataType).as(f.name)
}
df.select(cols: _*).registerTempTable("t")

val query = 
"""
|SELECT avg(cleanedplaytimems),
|       count(1)
|FROM
|  (SELECT 0 key,
|          avg(lifeAverageBitrateKbps) avgbitrate
|   FROM anon_sdm2_ss
|   WHERE lifeAverageBitrateKbps > 0) t1,
|  (SELECT 0 key,
|          lifeAverageBitrateKbps,
|          if(playtimems > 0, playtimems, 0) cleanedplaytimems
|   FROM anon_sdm2_ss
|   WHERE lifeAverageBitrateKbps > 0) t2
|WHERE t1.key=t2.key
|  AND t2.lifeAverageBitrateKbps < 0.5 * t1.avgbitrate
""".stripMargin

sqlContext.sql(query).explain(true)

===========================
Output:

.....
== Analyzed Logical Plan ==
Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1) AS _c1#112L]
 Filter ((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, DoubleType) < (0.5 * avgbitrate#108)))
  Join Inner, None
   Subquery t1
    Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, LongType)) AS avgbitrate#108]
     Filter (lifeAverageBitrateKbps#105 > 0)
      Subquery anon_sdm2_ss
       Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106]
        Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
   Subquery t2
    Project [0 AS key#109,lifeAverageBitrateKbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#106 > 0),playtimems#106,0) AS cleanedplaytimems#110]
     Filter (lifeAverageBitrateKbps#105 > 0)
      Subquery anon_sdm2_ss
       Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106]
        Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)

== Optimized Logical Plan ==
Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1) AS _c1#112L]
 Project [cleanedplaytimems#110]
  Join Inner, Some(((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, DoubleType) < (0.5 * avgbitrate#108))))
   Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, LongType)) AS avgbitrate#108]
    Project [lifeaveragebitratekbps#27 AS lifeaveragebitratekbps#105]
     !Filter (lifeAverageBitrateKbps#105 > 0)
      Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
   Project [0 AS key#109,lifeaveragebitratekbps#27 AS lifeaveragebitratekbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#89 AS playtimems#106 > 0),playtimems#89 AS playtimems#106,0) AS cleanedplaytimems#110]
    !Filter (lifeAverageBitrateKbps#105 > 0)
     Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)

Note:
Filter is unresolved





> Bug in SQL Optimizer: Unresolved Attribute after pushing Filter below Project
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-8466
>                 URL: https://issues.apache.org/jira/browse/SPARK-8466
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.3.1, 1.4.0
>            Reporter: Kai Zeng
>            Priority: Critical
>
> Input Data: a parquet file stored in hdfs://xxxx/data with two columns (lifeAverageBitrateKbps int, playtimems int)
> =================================
> Scripts used in spark-shell:
> {code}
> val df = sqlContext.parquetFile("hdfs://xxxx/data")
> import org.apache.spark.sql.types._
> val cols = df.schema.fields.map { f =>
>   val dataType = f.dataType match {
>     case DoubleType | FloatType => DecimalType.Unlimited
>     case t => t
>   }
>   df.col(f.name).cast(dataType).as(f.name)
> }
> df.select(cols: _*).registerTempTable("t")
> val query = 
> """
> |SELECT avg(cleanedplaytimems),
> |       count(1)
> |FROM
> |  (SELECT 0 key,
> |          avg(lifeAverageBitrateKbps) avgbitrate
> |   FROM anon_sdm2_ss
> |   WHERE lifeAverageBitrateKbps > 0) t1,
> |  (SELECT 0 key,
> |          lifeAverageBitrateKbps,
> |          if(playtimems > 0, playtimems, 0) cleanedplaytimems
> |   FROM anon_sdm2_ss
> |   WHERE lifeAverageBitrateKbps > 0) t2
> |WHERE t1.key=t2.key
> |  AND t2.lifeAverageBitrateKbps < 0.5 * t1.avgbitrate
> """.stripMargin
> sqlContext.sql(query).explain(true)
> {code}
> ===========================
> Output:
> {code}
> == Analyzed Logical Plan ==
> Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1) AS _c1#112L]
>  Filter ((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, DoubleType) < (0.5 * avgbitrate#108)))
>   Join Inner, None
>    Subquery t1
>     Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, LongType)) AS avgbitrate#108]
>      Filter (lifeAverageBitrateKbps#105 > 0)
>       Subquery anon_sdm2_ss
>        Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106]
>         Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
>    Subquery t2
>     Project [0 AS key#109,lifeAverageBitrateKbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#106 > 0),playtimems#106,0) AS cleanedplaytimems#110]
>      Filter (lifeAverageBitrateKbps#105 > 0)
>       Subquery anon_sdm2_ss
>        Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106]
>         Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
> == Optimized Logical Plan ==
> Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1) AS _c1#112L]
>  Project [cleanedplaytimems#110]
>   Join Inner, Some(((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, DoubleType) < (0.5 * avgbitrate#108))))
>    Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, LongType)) AS avgbitrate#108]
>     Project [lifeaveragebitratekbps#27 AS lifeaveragebitratekbps#105]
>      !Filter (lifeAverageBitrateKbps#105 > 0)
>       Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
>    Project [0 AS key#109,lifeaveragebitratekbps#27 AS lifeaveragebitratekbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#89 AS playtimems#106 > 0),playtimems#89 AS playtimems#106,0) AS cleanedplaytimems#110]
>     !Filter (lifeAverageBitrateKbps#105 > 0)
>      Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
> {code}
> Note:
> Filter is unresolved



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org