You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Abhishek Gayakwad <a....@gmail.com> on 2016/01/06 05:14:23 UTC
[Spark-SQL] Custom aggregate function for GrouppedData
Hello Hivemind,
Referring to this thread -
https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html.
I have learnt that we can not do much with groupped data apart from using
existing aggregate functions. This blog post was written in may 2015, I
don't know if things are changes from that point of time. I am using 1.4
version of spark.
What I am trying to achieve is something very similar to collectset in hive
(actually unique ordered concated values.) e.g.
1,2
1,3
2,4
2,5
2,4
to
1, "2,3"
2, "4,5"
Currently I am achieving this by converting dataframe to RDD, do the
required operations and convert it back to dataframe as shown below.
public class AvailableSizes implements Serializable {
public DataFrame calculate(SQLContext ssc, DataFrame salesDataFrame) {
final JavaRDD<Row> rowJavaRDD = salesDataFrame.toJavaRDD();
JavaPairRDD<String, Row> pairs = rowJavaRDD.mapToPair(
(PairFunction<Row, String, Row>) row -> {
final Object[] objects = {row.getAs(0),
row.getAs(1), row.getAs(3)};
return new
Tuple2<>(row.getAs(SalesColumns.STYLE.name()), new
GenericRowWithSchema(objects, SalesColumns.getOutputSchema()));
});
JavaPairRDD<String, Row> withSizeList = pairs.reduceByKey(new
Function2<Row, Row, Row>() {
@Override
public Row call(Row aRow, Row bRow) {
final String uniqueCommaSeparatedSizes =
uniqueSizes(aRow, bRow);
final Object[] objects = {aRow.getAs(0),
aRow.getAs(1), uniqueCommaSeparatedSizes};
return new GenericRowWithSchema(objects,
SalesColumns.getOutputSchema());
}
private String uniqueSizes(Row aRow, Row bRow) {
final SortedSet<String> allSizes = new TreeSet<>();
final List<String> aSizes = Arrays.asList(((String)
aRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
final List<String> bSizes = Arrays.asList(((String)
bRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
allSizes.addAll(aSizes);
allSizes.addAll(bSizes);
return csvFormat(allSizes);
}
});
final JavaRDD<Row> values = withSizeList.values();
return ssc.createDataFrame(values, SalesColumns.getOutputSchema());
}
public String csvFormat(Collection<String> collection) {
return collection.stream().map(Object::toString).collect(Collectors.joining(","));
}
}
Please suggest if there is a better way of doing this.
Regards,
Abhishek
Re: [Spark-SQL] Custom aggregate function for GrouppedData
Posted by Abhishek Gayakwad <a....@gmail.com>.
Thanks Michael for replying, Aggregator/UDAF is exactly what I am looking
for, but are still on 1.4 and it's gonna take time to get 1.6.
On Wed, Jan 6, 2016 at 10:32 AM, Michael Armbrust <mi...@databricks.com>
wrote:
> In Spark 1.6 GroupedDataset
> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedDataset> has
> mapGroups, which sounds like what you are looking for. You can also write
> a custom Aggregator
> <https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html>
>
> On Tue, Jan 5, 2016 at 8:14 PM, Abhishek Gayakwad <a....@gmail.com>
> wrote:
>
>> Hello Hivemind,
>>
>> Referring to this thread -
>> https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html.
>> I have learnt that we can not do much with groupped data apart from using
>> existing aggregate functions. This blog post was written in may 2015, I
>> don't know if things are changes from that point of time. I am using 1.4
>> version of spark.
>>
>> What I am trying to achieve is something very similar to collectset in
>> hive (actually unique ordered concated values.) e.g.
>>
>> 1,2
>> 1,3
>> 2,4
>> 2,5
>> 2,4
>>
>> to
>> 1, "2,3"
>> 2, "4,5"
>>
>> Currently I am achieving this by converting dataframe to RDD, do the
>> required operations and convert it back to dataframe as shown below.
>>
>> public class AvailableSizes implements Serializable {
>>
>> public DataFrame calculate(SQLContext ssc, DataFrame salesDataFrame) {
>> final JavaRDD<Row> rowJavaRDD = salesDataFrame.toJavaRDD();
>>
>> JavaPairRDD<String, Row> pairs = rowJavaRDD.mapToPair(
>> (PairFunction<Row, String, Row>) row -> {
>> final Object[] objects = {row.getAs(0), row.getAs(1), row.getAs(3)};
>> return new Tuple2<>(row.getAs(SalesColumns.STYLE.name()), new GenericRowWithSchema(objects, SalesColumns.getOutputSchema()));
>> });
>>
>> JavaPairRDD<String, Row> withSizeList = pairs.reduceByKey(new Function2<Row, Row, Row>() {
>> @Override
>> public Row call(Row aRow, Row bRow) {
>> final String uniqueCommaSeparatedSizes = uniqueSizes(aRow, bRow);
>> final Object[] objects = {aRow.getAs(0), aRow.getAs(1), uniqueCommaSeparatedSizes};
>> return new GenericRowWithSchema(objects, SalesColumns.getOutputSchema());
>> }
>>
>> private String uniqueSizes(Row aRow, Row bRow) {
>> final SortedSet<String> allSizes = new TreeSet<>();
>> final List<String> aSizes = Arrays.asList(((String) aRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
>> final List<String> bSizes = Arrays.asList(((String) bRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
>> allSizes.addAll(aSizes);
>> allSizes.addAll(bSizes);
>> return csvFormat(allSizes);
>> }
>> });
>>
>> final JavaRDD<Row> values = withSizeList.values();
>>
>> return ssc.createDataFrame(values, SalesColumns.getOutputSchema());
>>
>> }
>>
>> public String csvFormat(Collection<String> collection) {
>> return collection.stream().map(Object::toString).collect(Collectors.joining(","));
>> }
>> }
>>
>> Please suggest if there is a better way of doing this.
>>
>> Regards,
>> Abhishek
>>
>
>
Re: [Spark-SQL] Custom aggregate function for GrouppedData
Posted by Michael Armbrust <mi...@databricks.com>.
In Spark 1.6 GroupedDataset
<http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedDataset>
has
mapGroups, which sounds like what you are looking for. You can also write
a custom Aggregator
<https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html>
On Tue, Jan 5, 2016 at 8:14 PM, Abhishek Gayakwad <a....@gmail.com>
wrote:
> Hello Hivemind,
>
> Referring to this thread -
> https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html.
> I have learnt that we can not do much with groupped data apart from using
> existing aggregate functions. This blog post was written in may 2015, I
> don't know if things are changes from that point of time. I am using 1.4
> version of spark.
>
> What I am trying to achieve is something very similar to collectset in
> hive (actually unique ordered concated values.) e.g.
>
> 1,2
> 1,3
> 2,4
> 2,5
> 2,4
>
> to
> 1, "2,3"
> 2, "4,5"
>
> Currently I am achieving this by converting dataframe to RDD, do the
> required operations and convert it back to dataframe as shown below.
>
> public class AvailableSizes implements Serializable {
>
> public DataFrame calculate(SQLContext ssc, DataFrame salesDataFrame) {
> final JavaRDD<Row> rowJavaRDD = salesDataFrame.toJavaRDD();
>
> JavaPairRDD<String, Row> pairs = rowJavaRDD.mapToPair(
> (PairFunction<Row, String, Row>) row -> {
> final Object[] objects = {row.getAs(0), row.getAs(1), row.getAs(3)};
> return new Tuple2<>(row.getAs(SalesColumns.STYLE.name()), new GenericRowWithSchema(objects, SalesColumns.getOutputSchema()));
> });
>
> JavaPairRDD<String, Row> withSizeList = pairs.reduceByKey(new Function2<Row, Row, Row>() {
> @Override
> public Row call(Row aRow, Row bRow) {
> final String uniqueCommaSeparatedSizes = uniqueSizes(aRow, bRow);
> final Object[] objects = {aRow.getAs(0), aRow.getAs(1), uniqueCommaSeparatedSizes};
> return new GenericRowWithSchema(objects, SalesColumns.getOutputSchema());
> }
>
> private String uniqueSizes(Row aRow, Row bRow) {
> final SortedSet<String> allSizes = new TreeSet<>();
> final List<String> aSizes = Arrays.asList(((String) aRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
> final List<String> bSizes = Arrays.asList(((String) bRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
> allSizes.addAll(aSizes);
> allSizes.addAll(bSizes);
> return csvFormat(allSizes);
> }
> });
>
> final JavaRDD<Row> values = withSizeList.values();
>
> return ssc.createDataFrame(values, SalesColumns.getOutputSchema());
>
> }
>
> public String csvFormat(Collection<String> collection) {
> return collection.stream().map(Object::toString).collect(Collectors.joining(","));
> }
> }
>
> Please suggest if there is a better way of doing this.
>
> Regards,
> Abhishek
>