You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (JIRA)" <ji...@apache.org> on 2018/07/07 02:55:00 UTC

[jira] [Resolved] (SPARK-24569) Spark Aggregator with output type Option[Boolean] creates column of type Row

     [ https://issues.apache.org/jira/browse/SPARK-24569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Wenchen Fan resolved SPARK-24569.
---------------------------------
       Resolution: Fixed
    Fix Version/s: 2.4.0

Issue resolved by pull request 21611
[https://github.com/apache/spark/pull/21611]

> Spark Aggregator with output type Option[Boolean] creates column of type Row
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-24569
>                 URL: https://issues.apache.org/jira/browse/SPARK-24569
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.1
>         Environment: OSX
>            Reporter: John Conwell
>            Priority: Major
>             Fix For: 2.4.0
>
>
> Spark SQL Aggregator that returns an output column of Option[Boolean] creates a column of type StructField(<col_name>,StructType(StructField(value,BooleanType,true)),true) instead of StructField(<col_name>,BooleanType,true).  
> In other words it puts a Row instance into the new column
>  
> Reproduction
>  
> {code:java}
> class OptionBooleanAggregatorTest extends BaseFreeSpec {
>   val ss: SparkSession = getSparkSession
>   "test option" in {
>     import ss.implicits._
>     val df = List(
>       Thing("bob", Some(true)),
>       Thing("bob", Some(false)),
>       Thing("bob", None))
>       .toDF()
>     val group = df
>       .groupBy("name")
>       .agg(OptionBooleanAggregator("isGood").toColumn.alias("isGood"))
>       .cache()
>     assert(group.schema("name").dataType == StringType)
>     //this will fail
>     assert(group.schema("isGood").dataType == BooleanType)
>   }
> }
> case class Thing(name: String, isGood: Option[Boolean])
> case class OptionBooleanAggregator(colName: String) extends Aggregator[Row, Option[Boolean], Option[Boolean]] {
>   override def zero: Option[Boolean] = Option.empty[Boolean]
>   override def reduce(buffer: Option[Boolean], row: Row): Option[Boolean] = {
>     val index = row.fieldIndex(colName)
>     val value = if (row.isNullAt(index))
>       Option.empty[Boolean]
>     else
>       Some(row.getBoolean(index))
>     merge(buffer, value)
>   }
>   override def merge(b1: Option[Boolean], b2: Option[Boolean]): Option[Boolean] = {
>     if ((b1.isDefined && b1.get) || (b2.isDefined && b2.get)) {
>       Some(true)
>     }
>     else if (b1.isDefined) {
>       b1
>     }
>     else
>       b2
>   }
>   override def finish(reduction: Option[Boolean]): Option[Boolean] = reduction
>   override def bufferEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder
>   override def outputEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder
>   def OptionalBoolEncoder: org.apache.spark.sql.Encoder[Option[Boolean]] = org.apache.spark.sql.catalyst.encoders.ExpressionEncoder()
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org