You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (JIRA)" <ji...@apache.org> on 2018/07/07 02:56:00 UTC
[jira] [Assigned] (SPARK-24569) Spark Aggregator with output type
Option[Boolean] creates column of type Row
[ https://issues.apache.org/jira/browse/SPARK-24569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan reassigned SPARK-24569:
-----------------------------------
Assignee: Liang-Chi Hsieh
> Spark Aggregator with output type Option[Boolean] creates column of type Row
> ----------------------------------------------------------------------------
>
> Key: SPARK-24569
> URL: https://issues.apache.org/jira/browse/SPARK-24569
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.3.1
> Environment: OSX
> Reporter: John Conwell
> Assignee: Liang-Chi Hsieh
> Priority: Major
> Fix For: 2.4.0
>
>
> Spark SQL Aggregator that returns an output column of Option[Boolean] creates a column of type StructField(<col_name>,StructType(StructField(value,BooleanType,true)),true) instead of StructField(<col_name>,BooleanType,true).
> In other words it puts a Row instance into the new column
>
> Reproduction
>
> {code:java}
> class OptionBooleanAggregatorTest extends BaseFreeSpec {
> val ss: SparkSession = getSparkSession
> "test option" in {
> import ss.implicits._
> val df = List(
> Thing("bob", Some(true)),
> Thing("bob", Some(false)),
> Thing("bob", None))
> .toDF()
> val group = df
> .groupBy("name")
> .agg(OptionBooleanAggregator("isGood").toColumn.alias("isGood"))
> .cache()
> assert(group.schema("name").dataType == StringType)
> //this will fail
> assert(group.schema("isGood").dataType == BooleanType)
> }
> }
> case class Thing(name: String, isGood: Option[Boolean])
> case class OptionBooleanAggregator(colName: String) extends Aggregator[Row, Option[Boolean], Option[Boolean]] {
> override def zero: Option[Boolean] = Option.empty[Boolean]
> override def reduce(buffer: Option[Boolean], row: Row): Option[Boolean] = {
> val index = row.fieldIndex(colName)
> val value = if (row.isNullAt(index))
> Option.empty[Boolean]
> else
> Some(row.getBoolean(index))
> merge(buffer, value)
> }
> override def merge(b1: Option[Boolean], b2: Option[Boolean]): Option[Boolean] = {
> if ((b1.isDefined && b1.get) || (b2.isDefined && b2.get)) {
> Some(true)
> }
> else if (b1.isDefined) {
> b1
> }
> else
> b2
> }
> override def finish(reduction: Option[Boolean]): Option[Boolean] = reduction
> override def bufferEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder
> override def outputEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder
> def OptionalBoolEncoder: org.apache.spark.sql.Encoder[Option[Boolean]] = org.apache.spark.sql.catalyst.encoders.ExpressionEncoder()
> }
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org