You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ritika Maheshwari (Jira)" <ji...@apache.org> on 2023/02/21 19:34:00 UTC

[jira] [Comment Edited] (SPARK-41391) The output column name of `groupBy.agg(count_distinct)` is incorrect

    [ https://issues.apache.org/jira/browse/SPARK-41391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691760#comment-17691760 ] 

Ritika Maheshwari edited comment on SPARK-41391 at 2/21/23 7:33 PM:
--------------------------------------------------------------------

A better solution

 

In *RelationalGroupedDataset* changed the following method to add another case

private[this] def alias(expr: Expression): NamedExpression = expr match {

    case expr: NamedExpression => expr

    case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>

      UnresolvedAlias(a, Some(Column.generateAlias))

    *case ag: UnresolvedFunction if (containsStar(Seq(ag))) =>*

      *UnresolvedAlias(expr, None)*

    case expr: Expression =>

      Alias(expr, toPrettySQL(expr))()

  }

In *SQLImplicit.scala* MADE CHANGE TO FOLLOWING METHOD

implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
{*}if (sc.parts.length == 1 && sc.parts.contains("{*}")) {*
{*}new ColumnName("{*}")*
*}* else

{ new ColumnName(sc.s(args: _*)) }

This seems to work and this create a tree structure similar to what you get when you use spark sql for aggrgate queries. Major difference between scala and spark sql was that spark sql was creating an unresolvedalias for count (distinct * ) expressions where as scala was creating an alias where the alias was count(unresolvedstar()) 

scala> val df = spark.range(1, 10).withColumn("value", lit(1))

{*}df{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, value: int]

 

scala> df.createOrReplaceTempView("table")

 

scala>  df.groupBy("id").agg(count_distinct($"*"))

{*}res1{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, value): bigint]

 

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")

{*}res2{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, value): bigint]

 

scala> df.groupBy("id").agg(count_distinct($"value"))

{*}res3{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(value): bigint]

 

scala> 

 


was (Author: ritikam):
A better solution

 

In *RelationalGroupedDataset* changed the following method to add another case

private[this] def alias(expr: Expression): NamedExpression = expr match {
case expr: NamedExpression => expr
case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>
UnresolvedAlias(a, Some(Column.generateAlias))
*case ag: UnresolvedFunction if (containsStar(Seq(ag))) =>*
 *UnresolvedAlias(expr, None)*
case expr: Expression =>
Alias(expr, toPrettySQL(expr))()
}

In *SQLImplicit.scala* MADE CHANGE TO FOLLOWING METHOD

implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
*if (sc.parts.length == 1 && sc.parts.contains("*")) {*
 *new ColumnName("*")*
 *}* else {
new ColumnName(sc.s(args: _*))
}

This seems to work and this create a tree structure similar to what you get when you use spark sql for aggrgate queries. Major difference between scala and spark sql was that spark sql was creating an unresolvedalias for count (distinct * ) expressions where as scala was creating an alias where the alias was count(unresolvedstar()) 

scala> val df = spark.range(1, 10).withColumn("value", lit(1))

*df*: *org.apache.spark.sql.DataFrame* = [id: bigint, value: int]

 

scala> df.createOrReplaceTempView("table")

 

scala>  df.groupBy("id").agg(count_distinct($"*"))

*res1*: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, value): bigint]

 

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")

*res2*: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, value): bigint]

 

scala> df.groupBy("id").agg(count_distinct($"value"))

*res3*: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(value): bigint]

 

scala> 

 

> The output column name of `groupBy.agg(count_distinct)` is incorrect
> --------------------------------------------------------------------
>
>                 Key: SPARK-41391
>                 URL: https://issues.apache.org/jira/browse/SPARK-41391
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.0, 3.3.0, 3.4.0
>            Reporter: Ruifeng Zheng
>            Priority: Major
>
> scala> val df = spark.range(1, 10).withColumn("value", lit(1))
> df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]
> scala> df.createOrReplaceTempView("table")
> scala> df.groupBy("id").agg(count_distinct($"value"))
> res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id ")
> res2: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): bigint]
> scala> df.groupBy("id").agg(count_distinct($"*"))
> res3: org.apache.spark.sql.DataFrame = [id: bigint, count(unresolvedstar()): bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")
> res4: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT id, value): bigint]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org