You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Alexander Pivovarov <ap...@gmail.com> on 2016/06/07 22:58:47 UTC
Dataset API agg question
I'm trying to switch from RDD API to Dataset API
My question is about reduceByKey method
e.g. in the following example I'm trying to rewrite
sc.parallelize(Seq(1->2, 1->5, 3->6)).reduceByKey(math.max).take(10)
using DS API. That is what I have so far:
Seq(1->2, 1->5,
3->6).toDS.groupBy(_._1).agg(max($"_2").as(ExpressionEncoder[Int])).take(10)
Questions:
1. is it possible to avoid typing "as(ExpressionEncoder[Int])" or replace
it with smth shorter?
2. Why I have to use String column name in max function? e.g. $"_2" or
col("_2"). can I use _._2 instead?
Alex
Re: Dataset API agg question
Posted by Reynold Xin <rx...@databricks.com>.
Take a look at the implementation of typed sum/avg:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
You can implement a typed max/min.
On Tue, Jun 7, 2016 at 4:31 PM, Alexander Pivovarov <ap...@gmail.com>
wrote:
> Ted, It does not work like that
>
> you have to .map(toAB).toDS
>
> On Tue, Jun 7, 2016 at 4:07 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Have you tried the following ?
>>
>> Seq(1->2, 1->5, 3->6).toDS("a", "b")
>>
>> then you can refer to columns by name.
>>
>> FYI
>>
>>
>> On Tue, Jun 7, 2016 at 3:58 PM, Alexander Pivovarov <apivovarov@gmail.com
>> > wrote:
>>
>>> I'm trying to switch from RDD API to Dataset API
>>> My question is about reduceByKey method
>>>
>>> e.g. in the following example I'm trying to rewrite
>>>
>>> sc.parallelize(Seq(1->2, 1->5, 3->6)).reduceByKey(math.max).take(10)
>>>
>>> using DS API. That is what I have so far:
>>>
>>> Seq(1->2, 1->5, 3->6).toDS.groupBy(_._1).agg(max($"_2").as(ExpressionEncoder[Int])).take(10)
>>>
>>> Questions:
>>>
>>> 1. is it possible to avoid typing "as(ExpressionEncoder[Int])" or
>>> replace it with smth shorter?
>>>
>>> 2. Why I have to use String column name in max function? e.g. $"_2" or
>>> col("_2"). can I use _._2 instead?
>>>
>>>
>>> Alex
>>>
>>
>>
>
Re: Dataset API agg question
Posted by Alexander Pivovarov <ap...@gmail.com>.
Ted, It does not work like that
you have to .map(toAB).toDS
On Tue, Jun 7, 2016 at 4:07 PM, Ted Yu <yu...@gmail.com> wrote:
> Have you tried the following ?
>
> Seq(1->2, 1->5, 3->6).toDS("a", "b")
>
> then you can refer to columns by name.
>
> FYI
>
>
> On Tue, Jun 7, 2016 at 3:58 PM, Alexander Pivovarov <ap...@gmail.com>
> wrote:
>
>> I'm trying to switch from RDD API to Dataset API
>> My question is about reduceByKey method
>>
>> e.g. in the following example I'm trying to rewrite
>>
>> sc.parallelize(Seq(1->2, 1->5, 3->6)).reduceByKey(math.max).take(10)
>>
>> using DS API. That is what I have so far:
>>
>> Seq(1->2, 1->5, 3->6).toDS.groupBy(_._1).agg(max($"_2").as(ExpressionEncoder[Int])).take(10)
>>
>> Questions:
>>
>> 1. is it possible to avoid typing "as(ExpressionEncoder[Int])" or replace
>> it with smth shorter?
>>
>> 2. Why I have to use String column name in max function? e.g. $"_2" or
>> col("_2"). can I use _._2 instead?
>>
>>
>> Alex
>>
>
>
Re: Dataset API agg question
Posted by Ted Yu <yu...@gmail.com>.
Have you tried the following ?
Seq(1->2, 1->5, 3->6).toDS("a", "b")
then you can refer to columns by name.
FYI
On Tue, Jun 7, 2016 at 3:58 PM, Alexander Pivovarov <ap...@gmail.com>
wrote:
> I'm trying to switch from RDD API to Dataset API
> My question is about reduceByKey method
>
> e.g. in the following example I'm trying to rewrite
>
> sc.parallelize(Seq(1->2, 1->5, 3->6)).reduceByKey(math.max).take(10)
>
> using DS API. That is what I have so far:
>
> Seq(1->2, 1->5, 3->6).toDS.groupBy(_._1).agg(max($"_2").as(ExpressionEncoder[Int])).take(10)
>
> Questions:
>
> 1. is it possible to avoid typing "as(ExpressionEncoder[Int])" or replace
> it with smth shorter?
>
> 2. Why I have to use String column name in max function? e.g. $"_2" or
> col("_2"). can I use _._2 instead?
>
>
> Alex
>