You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Amit Sela (JIRA)" <ji...@apache.org> on 2016/07/18 16:54:20 UTC
[jira] [Updated] (SPARK-16607) Aggregator with null initialisation
will result in null
[ https://issues.apache.org/jira/browse/SPARK-16607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Amit Sela updated SPARK-16607:
------------------------------
Description:
Java code example:
{code}
SparkSession session = SparkSession.builder()
.appName("TestAggregatorJava")
.master("local[*]")
.getOrCreate();
Dataset<Tuple2<String, Integer>> ds1 = session.createDataset(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("a", 2),
new Tuple2<>("a", 3)
), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset<Tuple2<String, Integer>> ds2 = ds1.map(
new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> value) throws Exception {
if (value._2() > 1) {
return value;
} else {
return new Tuple2<>(value._1, null);
}
}
}, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset<Tuple2<String, Integer>> ds3 = ds2.groupByKey(
new MapFunction<Tuple2<String,Integer>, String>() {
@Override
public String call(Tuple2<String, Integer> value) throws Exception {
return value._1();
}
}, Encoders.STRING()).agg(new Aggregator<Tuple2<String, Integer>, Integer, Integer>() {
@Override
public Integer zero() {
return null;
}
@Override
public Integer reduce(Integer b, Tuple2<String, Integer> a) {
return merge(b, a._2());
}
@Override
public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
return b2;
} else if (b2 == null){
return b1;
} else {
return b1 + b2;
}
}
@Override
public Integer finish(Integer reduction) {
return reduction;
}
@Override
public Encoder<Integer> bufferEncoder() {
return Encoders.INT();
}
@Override
public Encoder<Integer> outputEncoder() {
return Encoders.INT();
}
}.toColumn());
ds3.printSchema();
ds3.show();
}
{code}
I get this schema:
{noformat}
root
|-- value: string (nullable = true)
|-- (scala.Tuple2): integer (nullable = true)
{noformat}
And this result:
{noformat}
+-----+--------------+
|value|(scala.Tuple2)|
+-----+--------------+
| a| null|
+-----+--------------+
{noformat}
The same happens with Scala, simply wrap Scala's Int with a case class (because it defaults to 0) and you'll get the same result.
was:
Java code example:
{code}
SparkSession session = SparkSession.builder()
.appName("TestAggregatorJava")
.master("local[*]")
.getOrCreate();
Dataset<Tuple2<String, Integer>> ds1 = session.createDataset(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("a", 2),
new Tuple2<>("a", 3)
), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset<Tuple2<String, Integer>> ds2 = ds1.map(
new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> value) throws Exception {
if (value._2() > 1) {
return value;
} else {
return new Tuple2<>(value._1, null);
}
}
}, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset<Tuple2<String, Integer>> ds3 = ds2.groupByKey(
new MapFunction<Tuple2<String,Integer>, String>() {
@Override
public String call(Tuple2<String, Integer> value) throws Exception {
return value._1();
}
}, Encoders.STRING()).agg(new Aggregator<Tuple2<String, Integer>, Integer, Integer>() {
@Override
public Integer zero() {
return null;
}
@Override
public Integer reduce(Integer b, Tuple2<String, Integer> a) {
return merge(b, a._2());
}
@Override
public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
return b2;
} else if (b2 == null){
return b1;
} else {
return b1 + b2;
}
}
@Override
public Integer finish(Integer reduction) {
return reduction;
}
@Override
public Encoder<Integer> bufferEncoder() {
return Encoders.INT();
}
@Override
public Encoder<Integer> outputEncoder() {
return Encoders.INT();
}
}.toColumn());
ds3.printSchema();
ds3.show();
}
{code}
I get this schema:
{noformat}
root
|-- value: string (nullable = true)
|-- (scala.Tuple2): integer (nullable = true)
{noformat}
And this result:
{noformat}
+-----+--------------+
|value|(scala.Tuple2)|
+-----+--------------+
| a| null|
+-----+--------------+
{noformat}
The same happens with Scala, just wrap Scala's Int with a case class (because it defaults to 0) and you'll get the same result.
> Aggregator with null initialisation will result in null
> --------------------------------------------------------
>
> Key: SPARK-16607
> URL: https://issues.apache.org/jira/browse/SPARK-16607
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Environment: spark 2.0-branch
> Reporter: Amit Sela
>
> Java code example:
> {code}
> SparkSession session = SparkSession.builder()
> .appName("TestAggregatorJava")
> .master("local[*]")
> .getOrCreate();
> Dataset<Tuple2<String, Integer>> ds1 = session.createDataset(Arrays.asList(
> new Tuple2<>("a", 1),
> new Tuple2<>("a", 2),
> new Tuple2<>("a", 3)
> ), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
> Dataset<Tuple2<String, Integer>> ds2 = ds1.map(
> new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
> @Override
> public Tuple2<String, Integer> call(Tuple2<String, Integer> value) throws Exception {
> if (value._2() > 1) {
> return value;
> } else {
> return new Tuple2<>(value._1, null);
> }
> }
> }, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
> Dataset<Tuple2<String, Integer>> ds3 = ds2.groupByKey(
> new MapFunction<Tuple2<String,Integer>, String>() {
> @Override
> public String call(Tuple2<String, Integer> value) throws Exception {
> return value._1();
> }
> }, Encoders.STRING()).agg(new Aggregator<Tuple2<String, Integer>, Integer, Integer>() {
> @Override
> public Integer zero() {
> return null;
> }
> @Override
> public Integer reduce(Integer b, Tuple2<String, Integer> a) {
> return merge(b, a._2());
> }
> @Override
> public Integer merge(Integer b1, Integer b2) {
> if (b1 == null) {
> return b2;
> } else if (b2 == null){
> return b1;
> } else {
> return b1 + b2;
> }
> }
> @Override
> public Integer finish(Integer reduction) {
> return reduction;
> }
> @Override
> public Encoder<Integer> bufferEncoder() {
> return Encoders.INT();
> }
> @Override
> public Encoder<Integer> outputEncoder() {
> return Encoders.INT();
> }
> }.toColumn());
> ds3.printSchema();
> ds3.show();
> }
> {code}
> I get this schema:
> {noformat}
> root
> |-- value: string (nullable = true)
> |-- (scala.Tuple2): integer (nullable = true)
> {noformat}
> And this result:
> {noformat}
> +-----+--------------+
> |value|(scala.Tuple2)|
> +-----+--------------+
> | a| null|
> +-----+--------------+
> {noformat}
> The same happens with Scala, simply wrap Scala's Int with a case class (because it defaults to 0) and you'll get the same result.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org