You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Amit Sela (JIRA)" <ji...@apache.org> on 2016/07/10 08:28:11 UTC
[jira] [Comment Edited] (SPARK-15810) Aggregator doesn't play nice
with Option
[ https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15369496#comment-15369496 ]
Amit Sela edited comment on SPARK-15810 at 7/10/16 8:28 AM:
------------------------------------------------------------
Just ran this exact code, prefixed by:
{code}
val session = SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate()
import session.implicits._
{code}
This is the schema I get:
{noformat}
root
|-- value: string (nullable = true)
|-- anon$1(scala.Tuple2): struct (nullable = true)
| |-- value: integer (nullable = true)
{noformat}
And this is the output:
{noformat}
+-----+--------------------+
|value|anon$1(scala.Tuple2)|
+-----+--------------------+
| a| [5]|
+-----+--------------------+
{noformat}
was (Author: amitsela):
Just ran this exact code, prefixed by:
{code}
val session = SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate()
import session.implicits._val session = SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate()
import session.implicits._
{code}
This is the schema I get:
{noformat}
root
|-- value: string (nullable = true)
|-- anon$1(scala.Tuple2): struct (nullable = true)
| |-- value: integer (nullable = true)
{noformat}
And this is the output:
{noformat}
+-----+--------------------+
|value|anon$1(scala.Tuple2)|
+-----+--------------------+
| a| [5]|
+-----+--------------------+
{noformat}
> Aggregator doesn't play nice with Option
> ----------------------------------------
>
> Key: SPARK-15810
> URL: https://issues.apache.org/jira/browse/SPARK-15810
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Environment: spark 2.0.0-SNAPSHOT
> Reporter: koert kuipers
>
> {code}
> val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
> val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }
> val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), Option[Int], Option[Int]]{
> def zero: Option[Int] = None
> def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)
> def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)
> def finish(reduction: Option[Int]): Option[Int] = reduction
> def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
> def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
> }.toColumn)
> ds3.printSchema
> ds3.show
> {code}
> i get as output a somewhat odd looking schema, and after that the program just hangs pinning one cpu at 100%. the data never shows.
> output:
> {noformat}
> root
> |-- value: string (nullable = true)
> |-- $anon$1(scala.Tuple2): struct (nullable = true)
> | |-- value: integer (nullable = true)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org