You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Amit Sela (JIRA)" <ji...@apache.org> on 2016/07/18 17:05:20 UTC

[jira] [Commented] (SPARK-16607) Aggregator with null initialisation will result in null

    [ https://issues.apache.org/jira/browse/SPARK-16607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15382602#comment-15382602 ] 

Amit Sela commented on SPARK-16607:
-----------------------------------

Copy of the thread discussing this in the user mailing list:

Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

Amit Sela <am...@gmail.com>
Jun 26

to user

Sometimes, the BUF for the aggregator may depend on the actual input.. and while this passes the responsibility to handle null in merge/reduce to the developer, it sounds fine to me if he is the one who put null in zero() anyway.
Now, it seems that the aggregation is skipped entirely when zero() = null. Not sure if that was the behaviour in 1.6

Is this behaviour wanted ? 

Thanks,
Amit

Aggregator example:

public static class Agg extends Aggregator<Tuple2<String, Integer>, Integer, Integer> {

  @Override
  public Integer zero() {
    return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2<String, Integer> a) {
    if (b == null) {
      b = 0;
    }
    return b + a._2();
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
    if (b1 == null) {
      return b2;
    } else if (b2 == null) {
      return b1;
    } else {
      return b1 + b2;
    }
  }

Takeshi Yamamuro <li...@gmail.com>
Jun 26

to me, user

Hi,

This behaviour seems to be expected because you must ensure `b + zero() = b`
The your case `b + null = null` breaks this rule.
This is the same with v1.6.1.
See: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57

// maropu


Amit Sela <am...@gmail.com>
Jun 26

to Takeshi, user

Not sure about what's the rule in case of `b + null = null` but the same code works perfectly in 1.6.1, just tried it..


Takeshi Yamamuro
Jun 26

to me, user

Whatever it is, this is expected; if an initial value is null, spark codegen removes all the aggregates.
See: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199

// maropu


Amit Sela <am...@gmail.com>
Jun 26

to Takeshi, user

This "if (value == null)" condition you point to exists in 1.6 branch as well, so that's probably not the reason. 


Takeshi Yamamuro
Jun 26

to me, user

No, TypedAggregateExpression that uses Aggregator#zero is different between v2.0 and v1.6.
v2.0: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91
v1.6: https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115

// maropu


Amit Sela <am...@gmail.com>
Jun 27

to Takeshi, user

OK. I see that, but the current (provided) implementations are very naive - Sum, Count, Average -let's take Max for example: I guess zero() would be set to some value like Long.MIN_VALUE, but what if you trigger (I assume in the future Spark streaming will support time-based triggers) for a result and there are no events ? 

And like I said, for a more general use case: What if my zero() function depends on my input ?

I just don't see the benefit of this behaviour, though I realise this is the implementation. 

Thanks,
Amit


Koert Kuipers <ko...@tresata.com>
Jun 30

to me, Takeshi, user

its the difference between a semigroup and a monoid, and yes max does not easily fit into a monoid.

see also discussion here:
https://issues.apache.org/jira/browse/SPARK-15598


Amit Sela <am...@gmail.com>
Jul 2

to Koert, Takeshi, user

Thanks for pointing that Koert!

I understand now why zero() and not init(a: IN), though I still don't see a good reason to skip the aggregation if zero returns null. 
If the user did it, it's on him to take care of null cases in reduce/merge, but it opens-up the possibility to use the input to create the buffer for the aggregator. 
Wouldn't that at least enable the functionality discussed in SPARK-15598 ? without changing how the Aggregator works.

I bypassed it by using Optional (Guava) because I'm using the Java API, but it's a bit cumbersome...

Thanks,
Amit


Koert Kuipers
Jul 2

to me, Takeshi, user

valid functions can be written for reduce and merge when the zero is null. so not being able to provide null as the initial value is something troublesome.

i guess the proper way to do this is use Option, and have the None be the zero, which is what i assumed you did?
unfortunately last time i tried using scala Options with spark Aggregators it didnt work quite well. see:
https://issues.apache.org/jira/browse/SPARK-15810

lifting a semigroup into a monoid like this using Option is fairly typical, so either null or None has to work or else this api will be somewhat unpleasant to use for anything practical.

for an example of this lifting on a related Aggregator class:
https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala#L420

it would be nice to provide a similar convenience method for spark's Aggregator. basically if the user provides no zero the output is Option[OUT] instead of OUT, which spark translates into OUT being nullable.​
 

> Aggregator with null initialisation will result in null 
> --------------------------------------------------------
>
>                 Key: SPARK-16607
>                 URL: https://issues.apache.org/jira/browse/SPARK-16607
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>         Environment: spark 2.0-branch
>            Reporter: Amit Sela
>
> Java code example:
> {code}
>     SparkSession session = SparkSession.builder()
>                                        .appName("TestAggregatorJava")
>                                        .master("local[*]")
>                                        .getOrCreate();
>     Dataset<Tuple2<String, Integer>> ds1 = session.createDataset(Arrays.asList(
>             new Tuple2<>("a", 1),
>             new Tuple2<>("a", 2),
>             new Tuple2<>("a", 3)
>     ), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
>     Dataset<Tuple2<String, Integer>> ds2 = ds1.map(
>         new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
>       @Override
>       public Tuple2<String, Integer> call(Tuple2<String, Integer> value) throws Exception {
>         if (value._2() > 1) {
>           return value;
>         } else {
>           return new Tuple2<>(value._1, null);
>         }
>       }
>     }, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
>     Dataset<Tuple2<String, Integer>> ds3 = ds2.groupByKey(
>         new MapFunction<Tuple2<String,Integer>, String>() {
>       @Override
>       public String call(Tuple2<String, Integer> value) throws Exception {
>         return value._1();
>       }
>     }, Encoders.STRING()).agg(new Aggregator<Tuple2<String, Integer>, Integer, Integer>() {
>       @Override
>       public Integer zero() {
>         return null;
>       }
>       @Override
>       public Integer reduce(Integer b, Tuple2<String, Integer> a) {
>         return merge(b, a._2());
>       }
>       @Override
>       public Integer merge(Integer b1, Integer b2) {
>         if (b1 == null) {
>           return b2;
>         } else if (b2 == null){
>           return b1;
>         } else {
>           return b1 + b2;
>         }
>       }
>       @Override
>       public Integer finish(Integer reduction) {
>         return reduction;
>       }
>       @Override
>       public Encoder<Integer> bufferEncoder() {
>         return Encoders.INT();
>       }
>       @Override
>       public Encoder<Integer> outputEncoder() {
>         return Encoders.INT();
>       }
>     }.toColumn());
>     ds3.printSchema();
>     ds3.show();
>   }
> {code} 
> I get this schema:
> {noformat}
> root
>  |-- value: string (nullable = true)
>  |-- (scala.Tuple2): integer (nullable = true)
> {noformat}
> And this result:
> {noformat}
> +-----+--------------+
> |value|(scala.Tuple2)|
> +-----+--------------+
> |    a|          null|
> +-----+--------------+
> {noformat}
> The same happens with Scala, simply wrap Scala's Int with a case class (because it defaults to 0) and you'll get the same result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org