You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "koert kuipers (JIRA)" <ji...@apache.org> on 2016/12/05 04:15:58 UTC

[jira] [Commented] (SPARK-18711) NPE in generated SpecificMutableProjection for Aggregator

    [ https://issues.apache.org/jira/browse/SPARK-18711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15721175#comment-15721175 ] 

koert kuipers commented on SPARK-18711:
---------------------------------------

simplified:
{noformat}
  case class Holder(i: Int)

  val agg1 = new Aggregator[Int, Tuple1[Holder], Seq[(String, Int, Int)]] {
    def zero: Tuple1[Holder] = Tuple1(Holder(1))

    def reduce(b: Tuple1[Holder], a: Int): Tuple1[Holder] = Tuple1(Holder(1))

    def merge(b1: Tuple1[Holder], b2: Tuple1[Holder]): Tuple1[Holder] = Tuple1(Holder(1))

    def finish(reduction: Tuple1[Holder]): Seq[(String, Int, Int)] = Seq(("ha", 0, 0))

    def bufferEncoder: Encoder[Tuple1[Holder]] = ExpressionEncoder[Tuple1[Holder]]()

    def outputEncoder: Encoder[Seq[(String, Int, Int)]] = ExpressionEncoder[Seq[(String, Int, Int)]]()
  }

  val x = Seq(("a", 1), ("a", 2))
    .toDS
    .groupByKey(_._1)
    .mapValues(_._2)
    .agg(agg1.toColumn)
  x.printSchema
  x.show
{noformat}

error is still the same:
{noformat}
org.apache.spark.executor.Executor: Exception in task 1.0 in stage 146.0 (TID 423)
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:223)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:221)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:159)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
org.apache.spark.scheduler.TaskSetManager: Task 1 in stage 146.0 failed 1 times; aborting job
{noformat}

{noformat}

> NPE in generated SpecificMutableProjection for Aggregator
> ---------------------------------------------------------
>
>                 Key: SPARK-18711
>                 URL: https://issues.apache.org/jira/browse/SPARK-18711
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: koert kuipers
>
> this is a bug in the branch-2.1, but i don't think it was in 2.1.0-rc1
> code (contrived, but based on real code we run):
> {noformat}
>   case class Holder(i: Int)
>   val agg1 = new Aggregator[Int, Tuple1[Option[Holder]], Seq[(String, Int, Int)]] {
>     def zero: Tuple1[Option[Holder]] = {
>       val x = Tuple1(None)
>       println(s"zero ${x}")
>       x
>     }
>     def reduce(b: Tuple1[Option[Holder]], a: Int): Tuple1[Option[Holder]] = {
>       println(s"reduce ${b} ${a}")
>       Tuple1(Some(Holder(b._1.map(_.i + a).getOrElse(a))))
>     }
>     def merge(b1: Tuple1[Option[Holder]], b2: Tuple1[Option[Holder]]): Tuple1[Option[Holder]] = {
>       println(s"merge ${b1} ${b2}")
>       (b1._1, b2._1) match {
>         case (Some(Holder(i1)), Some(Holder(i2))) => Tuple1(Some(Holder(i1 + i2)))
>         case (Some(Holder(i1)), _) => Tuple1(Some(Holder(i1)))
>         case (_, Some(Holder(i2))) => Tuple1(Some(Holder(i2)))
>         case _ => Tuple1(None)
>       }
>     }
>     def finish(reduction: Tuple1[Option[Holder]]): Seq[(String, Int, Int)] = {
>       println(s"finish ${reduction}")
>       Seq(("ha", reduction._1.get.i, 0))
>     }
>     def bufferEncoder: Encoder[Tuple1[Option[Holder]]] = ExpressionEncoder[Tuple1[Option[Holder]]]()
>     def outputEncoder: Encoder[Seq[(String, Int, Int)]] = ExpressionEncoder[Seq[(String, Int, Int)]]()
>   }
>   val x = Seq(("a", 1), ("a", 2))
>     .toDS
>     .groupByKey(_._1)
>     .mapValues(_._2)
>     .agg(agg1.toColumn)
>   x.printSchema
>   x.show
> {noformat}
> result is:
> {noformat}
> org.apache.spark.executor.Executor: Exception in task 1.0 in stage 146.0 (TID 423)
> java.lang.NullPointerException
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply_0$(Unknown Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source)
> 	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:223)
> 	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:221)
> 	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:159)
> 	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:99)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}
> the error seems to be in the code generation for the aggregator result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org