You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Herman van Hovell (JIRA)" <ji...@apache.org> on 2016/12/05 19:38:58 UTC
[jira] [Resolved] (SPARK-18711) NPE in generated
SpecificMutableProjection for Aggregator
[ https://issues.apache.org/jira/browse/SPARK-18711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Herman van Hovell resolved SPARK-18711.
---------------------------------------
Resolution: Fixed
Assignee: Wenchen Fan
Fix Version/s: 2.1.0
> NPE in generated SpecificMutableProjection for Aggregator
> ---------------------------------------------------------
>
> Key: SPARK-18711
> URL: https://issues.apache.org/jira/browse/SPARK-18711
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.1.0
> Reporter: koert kuipers
> Assignee: Wenchen Fan
> Fix For: 2.1.0
>
>
> this is a bug in the branch-2.1, but i don't think it was in 2.1.0-rc1
> code (contrived, but based on real code we run):
> {noformat}
> case class Holder(i: Int)
> val agg1 = new Aggregator[Int, Tuple1[Option[Holder]], Seq[(String, Int, Int)]] {
> def zero: Tuple1[Option[Holder]] = {
> val x = Tuple1(None)
> println(s"zero ${x}")
> x
> }
> def reduce(b: Tuple1[Option[Holder]], a: Int): Tuple1[Option[Holder]] = {
> println(s"reduce ${b} ${a}")
> Tuple1(Some(Holder(b._1.map(_.i + a).getOrElse(a))))
> }
> def merge(b1: Tuple1[Option[Holder]], b2: Tuple1[Option[Holder]]): Tuple1[Option[Holder]] = {
> println(s"merge ${b1} ${b2}")
> (b1._1, b2._1) match {
> case (Some(Holder(i1)), Some(Holder(i2))) => Tuple1(Some(Holder(i1 + i2)))
> case (Some(Holder(i1)), _) => Tuple1(Some(Holder(i1)))
> case (_, Some(Holder(i2))) => Tuple1(Some(Holder(i2)))
> case _ => Tuple1(None)
> }
> }
> def finish(reduction: Tuple1[Option[Holder]]): Seq[(String, Int, Int)] = {
> println(s"finish ${reduction}")
> Seq(("ha", reduction._1.get.i, 0))
> }
> def bufferEncoder: Encoder[Tuple1[Option[Holder]]] = ExpressionEncoder[Tuple1[Option[Holder]]]()
> def outputEncoder: Encoder[Seq[(String, Int, Int)]] = ExpressionEncoder[Seq[(String, Int, Int)]]()
> }
> val x = Seq(("a", 1), ("a", 2))
> .toDS
> .groupByKey(_._1)
> .mapValues(_._2)
> .agg(agg1.toColumn)
> x.printSchema
> x.show
> {noformat}
> result is:
> {noformat}
> org.apache.spark.executor.Executor: Exception in task 1.0 in stage 146.0 (TID 423)
> java.lang.NullPointerException
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply_0$(Unknown Source)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source)
> at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:223)
> at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:221)
> at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:159)
> at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> the error seems to be in the code generation for the aggregator result.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org