You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Amit Sela (JIRA)" <ji...@apache.org> on 2016/07/18 16:54:20 UTC

[jira] [Updated] (SPARK-16607) Aggregator with null initialisation will result in null

     [ https://issues.apache.org/jira/browse/SPARK-16607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Amit Sela updated SPARK-16607:
------------------------------
    Description: 
Java code example:
{code}
    SparkSession session = SparkSession.builder()
                                       .appName("TestAggregatorJava")
                                       .master("local[*]")
                                       .getOrCreate();
    Dataset<Tuple2<String, Integer>> ds1 = session.createDataset(Arrays.asList(
            new Tuple2<>("a", 1),
            new Tuple2<>("a", 2),
            new Tuple2<>("a", 3)
    ), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
    Dataset<Tuple2<String, Integer>> ds2 = ds1.map(
        new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

      @Override
      public Tuple2<String, Integer> call(Tuple2<String, Integer> value) throws Exception {
        if (value._2() > 1) {
          return value;
        } else {
          return new Tuple2<>(value._1, null);
        }
      }
    }, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
    Dataset<Tuple2<String, Integer>> ds3 = ds2.groupByKey(
        new MapFunction<Tuple2<String,Integer>, String>() {

      @Override
      public String call(Tuple2<String, Integer> value) throws Exception {
        return value._1();
      }
    }, Encoders.STRING()).agg(new Aggregator<Tuple2<String, Integer>, Integer, Integer>() {
      @Override
      public Integer zero() {
        return null;
      }

      @Override
      public Integer reduce(Integer b, Tuple2<String, Integer> a) {
        return merge(b, a._2());
      }

      @Override
      public Integer merge(Integer b1, Integer b2) {
        if (b1 == null) {
          return b2;
        } else if (b2 == null){
          return b1;
        } else {
          return b1 + b2;
        }
      }

      @Override
      public Integer finish(Integer reduction) {
        return reduction;
      }

      @Override
      public Encoder<Integer> bufferEncoder() {
        return Encoders.INT();
      }

      @Override
      public Encoder<Integer> outputEncoder() {
        return Encoders.INT();
      }
    }.toColumn());

    ds3.printSchema();
    ds3.show();
  }
{code} 
I get this schema:
{noformat}
root
 |-- value: string (nullable = true)
 |-- (scala.Tuple2): integer (nullable = true)
{noformat}
And this result:
{noformat}
+-----+--------------+
|value|(scala.Tuple2)|
+-----+--------------+
|    a|          null|
+-----+--------------+
{noformat}

The same happens with Scala, simply wrap Scala's Int with a case class (because it defaults to 0) and you'll get the same result.

  was:
Java code example:
{code}
    SparkSession session = SparkSession.builder()
                                       .appName("TestAggregatorJava")
                                       .master("local[*]")
                                       .getOrCreate();
    Dataset<Tuple2<String, Integer>> ds1 = session.createDataset(Arrays.asList(
            new Tuple2<>("a", 1),
            new Tuple2<>("a", 2),
            new Tuple2<>("a", 3)
    ), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
    Dataset<Tuple2<String, Integer>> ds2 = ds1.map(
        new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

      @Override
      public Tuple2<String, Integer> call(Tuple2<String, Integer> value) throws Exception {
        if (value._2() > 1) {
          return value;
        } else {
          return new Tuple2<>(value._1, null);
        }
      }
    }, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
    Dataset<Tuple2<String, Integer>> ds3 = ds2.groupByKey(
        new MapFunction<Tuple2<String,Integer>, String>() {

      @Override
      public String call(Tuple2<String, Integer> value) throws Exception {
        return value._1();
      }
    }, Encoders.STRING()).agg(new Aggregator<Tuple2<String, Integer>, Integer, Integer>() {
      @Override
      public Integer zero() {
        return null;
      }

      @Override
      public Integer reduce(Integer b, Tuple2<String, Integer> a) {
        return merge(b, a._2());
      }

      @Override
      public Integer merge(Integer b1, Integer b2) {
        if (b1 == null) {
          return b2;
        } else if (b2 == null){
          return b1;
        } else {
          return b1 + b2;
        }
      }

      @Override
      public Integer finish(Integer reduction) {
        return reduction;
      }

      @Override
      public Encoder<Integer> bufferEncoder() {
        return Encoders.INT();
      }

      @Override
      public Encoder<Integer> outputEncoder() {
        return Encoders.INT();
      }
    }.toColumn());

    ds3.printSchema();
    ds3.show();
  }
{code} 
I get this schema:
{noformat}
root
 |-- value: string (nullable = true)
 |-- (scala.Tuple2): integer (nullable = true)
{noformat}
And this result:
{noformat}
+-----+--------------+
|value|(scala.Tuple2)|
+-----+--------------+
|    a|          null|
+-----+--------------+
{noformat}

The same happens with Scala, just wrap Scala's Int with a case class (because it defaults to 0) and you'll get the same result.


> Aggregator with null initialisation will result in null 
> --------------------------------------------------------
>
>                 Key: SPARK-16607
>                 URL: https://issues.apache.org/jira/browse/SPARK-16607
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>         Environment: spark 2.0-branch
>            Reporter: Amit Sela
>
> Java code example:
> {code}
>     SparkSession session = SparkSession.builder()
>                                        .appName("TestAggregatorJava")
>                                        .master("local[*]")
>                                        .getOrCreate();
>     Dataset<Tuple2<String, Integer>> ds1 = session.createDataset(Arrays.asList(
>             new Tuple2<>("a", 1),
>             new Tuple2<>("a", 2),
>             new Tuple2<>("a", 3)
>     ), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
>     Dataset<Tuple2<String, Integer>> ds2 = ds1.map(
>         new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
>       @Override
>       public Tuple2<String, Integer> call(Tuple2<String, Integer> value) throws Exception {
>         if (value._2() > 1) {
>           return value;
>         } else {
>           return new Tuple2<>(value._1, null);
>         }
>       }
>     }, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
>     Dataset<Tuple2<String, Integer>> ds3 = ds2.groupByKey(
>         new MapFunction<Tuple2<String,Integer>, String>() {
>       @Override
>       public String call(Tuple2<String, Integer> value) throws Exception {
>         return value._1();
>       }
>     }, Encoders.STRING()).agg(new Aggregator<Tuple2<String, Integer>, Integer, Integer>() {
>       @Override
>       public Integer zero() {
>         return null;
>       }
>       @Override
>       public Integer reduce(Integer b, Tuple2<String, Integer> a) {
>         return merge(b, a._2());
>       }
>       @Override
>       public Integer merge(Integer b1, Integer b2) {
>         if (b1 == null) {
>           return b2;
>         } else if (b2 == null){
>           return b1;
>         } else {
>           return b1 + b2;
>         }
>       }
>       @Override
>       public Integer finish(Integer reduction) {
>         return reduction;
>       }
>       @Override
>       public Encoder<Integer> bufferEncoder() {
>         return Encoders.INT();
>       }
>       @Override
>       public Encoder<Integer> outputEncoder() {
>         return Encoders.INT();
>       }
>     }.toColumn());
>     ds3.printSchema();
>     ds3.show();
>   }
> {code} 
> I get this schema:
> {noformat}
> root
>  |-- value: string (nullable = true)
>  |-- (scala.Tuple2): integer (nullable = true)
> {noformat}
> And this result:
> {noformat}
> +-----+--------------+
> |value|(scala.Tuple2)|
> +-----+--------------+
> |    a|          null|
> +-----+--------------+
> {noformat}
> The same happens with Scala, simply wrap Scala's Int with a case class (because it defaults to 0) and you'll get the same result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org