You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Andrei (Jira)" <ji...@apache.org> on 2021/12/04 23:30:00 UTC

[jira] [Created] (SPARK-37547) Unexpected NullPointerException when Aggregator.finish returns null

Andrei created SPARK-37547:
------------------------------

             Summary: Unexpected NullPointerException when Aggregator.finish returns null
                 Key: SPARK-37547
                 URL: https://issues.apache.org/jira/browse/SPARK-37547
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.2.0, 3.1.2
            Reporter: Andrei


I'm migrating existing code (Java 8) from Spark 2.4 to Spark 3 and I see NullPointerException when an Aggregator returns null in finish method for a custom class.

I've created simple snippet to repro the issue.
{code:java}
public class SparkTest {
  public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("name").setMaster("local[*]");
    SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
    List<String> data = Arrays.asList("1", "2", "3");
    Dataset<String> dataset = spark.createDataset(data, Encoders.STRING());
    Dataset<Row> aggDataset = dataset.groupBy("value").agg(new EntityAggregator().toColumn().name("agg"));
    aggDataset.show();
  }
} {code}
{code:java}
public class EntityAggregator extends Aggregator<Row, EntityAgg, EntityAgg> { public EntityAgg zero() { return new EntityAgg(0l); } 
public EntityAgg reduce(EntityAgg agg, Row row) { return agg; } 
public EntityAgg merge(EntityAgg e1, EntityAgg e2) { return e1; } 
public Encoder<EntityAgg> bufferEncoder() { return Encoders.bean(EntityAgg.class); } 
public Encoder<EntityAgg> outputEncoder() { return Encoders.bean(EntityAgg.class); } 
public EntityAgg finish(EntityAgg reduction) { return null; } 
}
{code}
{code:java}
public class EntityAgg {
  private long field;
  public EntityAgg() { }
  public EntityAgg(long field) { this.field = field; }
  public long getField() { return field; }
  public void setField(long field) { this.field = field; }
} {code}
Expected behavior is to print table like this
{noformat}
+-----+----+
|value| agg|
+-----+----+
|    3|null|
|    1|null|
|    2|null|
+-----+----+
{noformat}
This code works fine for 2.4 but fails with the following stacktrace for Spark 3 (I tested for 3.1.2 and 3.2.0)
{noformat}
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:49)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:259)
    at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:85)
    at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:32)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748){noformat}
Another observation, that if I change EntityAgg to String in Aggregator then It works fine.

I've found a test in github that should check for this behavior. [https://github.com/apache/spark/blob/branch-3.1/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala#L338] 

I haven't found similar issue so please point me to open ticket if there is any.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org