You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sandip Mehta <sa...@gmail.com> on 2017/12/08 03:51:58 UTC

Row Encoder For DataSet

Hi,

During my aggregation I end up having following schema.

Row(Row(val1,val2), Row(val1,val2,val3...))

val values = Seq(
    (Row(10, 11), Row(10, 2, 11)),
    (Row(10, 11), Row(10, 2, 11)),
    (Row(20, 11), Row(10, 2, 11))
  )


1st tuple is used to group the relevant records for aggregation. I have
used following to create dataset.

val s = StructType(Seq(
  StructField("x", IntegerType, true),
  StructField("y", IntegerType, true)
))
val s1 = StructType(Seq(
  StructField("u", IntegerType, true),
  StructField("v", IntegerType, true),
  StructField("z", IntegerType, true)
))

val ds = sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s),
RowEncoder(s1)))

Is this correct way of representing this?

How do I create dataset and row encoder for such use case for doing
groupByKey on this?



Regards
Sandeep

Re: Row Encoder For DataSet

Posted by Tomasz Dudek <me...@gmail.com>.
Hello Sandeep,

you can pass Row to UDAF. Just provide a proper inputSchema to your UDAF.

Check out this example https://docs.databricks.com/
spark/latest/spark-sql/udaf-scala.html

Yours,
Tomasz

2017-12-10 11:55 GMT+01:00 Sandip Mehta <sa...@gmail.com>:

> Thanks Georg. I have looked at UADF based on your suggestion. Looks like
> you can only pass single column to UADF. Is there any way you can pass
> entire Row to aggregate function?
>
> I want to list of user defined function and given row object. Perform the
> aggregation and return aggregated Row object.
>
> Regards
> Sandeep
>
> On Fri, Dec 8, 2017 at 12:47 PM Georg Heiler <ge...@gmail.com>
> wrote:
>
>> You are looking for an UADF.
>> Sandip Mehta <sa...@gmail.com> schrieb am Fr. 8. Dez. 2017 um
>> 06:20:
>>
>>> Hi,
>>>
>>> I want to group on certain columns and then for every group wants to
>>> apply custom UDF function to it. Currently groupBy only allows to add
>>> aggregation function to GroupData.
>>>
>>> For this was thinking to use groupByKey which will return
>>> KeyValueDataSet and then apply UDF for every group but really not been able
>>> solve this.
>>>
>>> SM
>>>
>>> On Fri, Dec 8, 2017 at 10:29 AM Weichen Xu <we...@databricks.com>
>>> wrote:
>>>
>>>> You can groupBy multiple columns on dataframe, so why you need so
>>>> complicated schema ?
>>>>
>>>> suppose df schema: (x, y, u, v, z)
>>>>
>>>> df.groupBy($"x", $"y").agg(...)
>>>>
>>>> Is this you want ?
>>>>
>>>> On Fri, Dec 8, 2017 at 11:51 AM, Sandip Mehta <
>>>> sandip.mehta.sub@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> During my aggregation I end up having following schema.
>>>>>
>>>>> Row(Row(val1,val2), Row(val1,val2,val3...))
>>>>>
>>>>> val values = Seq(
>>>>>     (Row(10, 11), Row(10, 2, 11)),
>>>>>     (Row(10, 11), Row(10, 2, 11)),
>>>>>     (Row(20, 11), Row(10, 2, 11))
>>>>>   )
>>>>>
>>>>>
>>>>> 1st tuple is used to group the relevant records for aggregation. I
>>>>> have used following to create dataset.
>>>>>
>>>>> val s = StructType(Seq(
>>>>>   StructField("x", IntegerType, true),
>>>>>   StructField("y", IntegerType, true)
>>>>> ))
>>>>> val s1 = StructType(Seq(
>>>>>   StructField("u", IntegerType, true),
>>>>>   StructField("v", IntegerType, true),
>>>>>   StructField("z", IntegerType, true)
>>>>> ))
>>>>>
>>>>> val ds = sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s), RowEncoder(s1)))
>>>>>
>>>>> Is this correct way of representing this?
>>>>>
>>>>> How do I create dataset and row encoder for such use case for doing
>>>>> groupByKey on this?
>>>>>
>>>>>
>>>>>
>>>>> Regards
>>>>> Sandeep
>>>>>
>>>>
>>>>

Re: Row Encoder For DataSet

Posted by Sandip Mehta <sa...@gmail.com>.
Thanks Georg. I have looked at UADF based on your suggestion. Looks like
you can only pass single column to UADF. Is there any way you can pass
entire Row to aggregate function?

I want to list of user defined function and given row object. Perform the
aggregation and return aggregated Row object.

Regards
Sandeep

On Fri, Dec 8, 2017 at 12:47 PM Georg Heiler <ge...@gmail.com>
wrote:

> You are looking for an UADF.
> Sandip Mehta <sa...@gmail.com> schrieb am Fr. 8. Dez. 2017 um
> 06:20:
>
>> Hi,
>>
>> I want to group on certain columns and then for every group wants to
>> apply custom UDF function to it. Currently groupBy only allows to add
>> aggregation function to GroupData.
>>
>> For this was thinking to use groupByKey which will return KeyValueDataSet
>> and then apply UDF for every group but really not been able solve this.
>>
>> SM
>>
>> On Fri, Dec 8, 2017 at 10:29 AM Weichen Xu <we...@databricks.com>
>> wrote:
>>
>>> You can groupBy multiple columns on dataframe, so why you need so
>>> complicated schema ?
>>>
>>> suppose df schema: (x, y, u, v, z)
>>>
>>> df.groupBy($"x", $"y").agg(...)
>>>
>>> Is this you want ?
>>>
>>> On Fri, Dec 8, 2017 at 11:51 AM, Sandip Mehta <
>>> sandip.mehta.sub@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> During my aggregation I end up having following schema.
>>>>
>>>> Row(Row(val1,val2), Row(val1,val2,val3...))
>>>>
>>>> val values = Seq(
>>>>     (Row(10, 11), Row(10, 2, 11)),
>>>>     (Row(10, 11), Row(10, 2, 11)),
>>>>     (Row(20, 11), Row(10, 2, 11))
>>>>   )
>>>>
>>>>
>>>> 1st tuple is used to group the relevant records for aggregation. I have
>>>> used following to create dataset.
>>>>
>>>> val s = StructType(Seq(
>>>>   StructField("x", IntegerType, true),
>>>>   StructField("y", IntegerType, true)
>>>> ))
>>>> val s1 = StructType(Seq(
>>>>   StructField("u", IntegerType, true),
>>>>   StructField("v", IntegerType, true),
>>>>   StructField("z", IntegerType, true)
>>>> ))
>>>>
>>>> val ds = sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s), RowEncoder(s1)))
>>>>
>>>> Is this correct way of representing this?
>>>>
>>>> How do I create dataset and row encoder for such use case for doing
>>>> groupByKey on this?
>>>>
>>>>
>>>>
>>>> Regards
>>>> Sandeep
>>>>
>>>
>>>

Re: Row Encoder For DataSet

Posted by Georg Heiler <ge...@gmail.com>.
You are looking for an UADF.
Sandip Mehta <sa...@gmail.com> schrieb am Fr. 8. Dez. 2017 um
06:20:

> Hi,
>
> I want to group on certain columns and then for every group wants to apply
> custom UDF function to it. Currently groupBy only allows to add aggregation
> function to GroupData.
>
> For this was thinking to use groupByKey which will return KeyValueDataSet
> and then apply UDF for every group but really not been able solve this.
>
> SM
>
> On Fri, Dec 8, 2017 at 10:29 AM Weichen Xu <we...@databricks.com>
> wrote:
>
>> You can groupBy multiple columns on dataframe, so why you need so
>> complicated schema ?
>>
>> suppose df schema: (x, y, u, v, z)
>>
>> df.groupBy($"x", $"y").agg(...)
>>
>> Is this you want ?
>>
>> On Fri, Dec 8, 2017 at 11:51 AM, Sandip Mehta <sandip.mehta.sub@gmail.com
>> > wrote:
>>
>>> Hi,
>>>
>>> During my aggregation I end up having following schema.
>>>
>>> Row(Row(val1,val2), Row(val1,val2,val3...))
>>>
>>> val values = Seq(
>>>     (Row(10, 11), Row(10, 2, 11)),
>>>     (Row(10, 11), Row(10, 2, 11)),
>>>     (Row(20, 11), Row(10, 2, 11))
>>>   )
>>>
>>>
>>> 1st tuple is used to group the relevant records for aggregation. I have
>>> used following to create dataset.
>>>
>>> val s = StructType(Seq(
>>>   StructField("x", IntegerType, true),
>>>   StructField("y", IntegerType, true)
>>> ))
>>> val s1 = StructType(Seq(
>>>   StructField("u", IntegerType, true),
>>>   StructField("v", IntegerType, true),
>>>   StructField("z", IntegerType, true)
>>> ))
>>>
>>> val ds = sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s), RowEncoder(s1)))
>>>
>>> Is this correct way of representing this?
>>>
>>> How do I create dataset and row encoder for such use case for doing
>>> groupByKey on this?
>>>
>>>
>>>
>>> Regards
>>> Sandeep
>>>
>>
>>

Re: Row Encoder For DataSet

Posted by Sandip Mehta <sa...@gmail.com>.
Hi,

I want to group on certain columns and then for every group wants to apply
custom UDF function to it. Currently groupBy only allows to add aggregation
function to GroupData.

For this was thinking to use groupByKey which will return KeyValueDataSet
and then apply UDF for every group but really not been able solve this.

SM

On Fri, Dec 8, 2017 at 10:29 AM Weichen Xu <we...@databricks.com>
wrote:

> You can groupBy multiple columns on dataframe, so why you need so
> complicated schema ?
>
> suppose df schema: (x, y, u, v, z)
>
> df.groupBy($"x", $"y").agg(...)
>
> Is this you want ?
>
> On Fri, Dec 8, 2017 at 11:51 AM, Sandip Mehta <sa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> During my aggregation I end up having following schema.
>>
>> Row(Row(val1,val2), Row(val1,val2,val3...))
>>
>> val values = Seq(
>>     (Row(10, 11), Row(10, 2, 11)),
>>     (Row(10, 11), Row(10, 2, 11)),
>>     (Row(20, 11), Row(10, 2, 11))
>>   )
>>
>>
>> 1st tuple is used to group the relevant records for aggregation. I have
>> used following to create dataset.
>>
>> val s = StructType(Seq(
>>   StructField("x", IntegerType, true),
>>   StructField("y", IntegerType, true)
>> ))
>> val s1 = StructType(Seq(
>>   StructField("u", IntegerType, true),
>>   StructField("v", IntegerType, true),
>>   StructField("z", IntegerType, true)
>> ))
>>
>> val ds = sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s), RowEncoder(s1)))
>>
>> Is this correct way of representing this?
>>
>> How do I create dataset and row encoder for such use case for doing
>> groupByKey on this?
>>
>>
>>
>> Regards
>> Sandeep
>>
>
>

Re: Row Encoder For DataSet

Posted by Weichen Xu <we...@databricks.com>.
You can groupBy multiple columns on dataframe, so why you need so
complicated schema ?

suppose df schema: (x, y, u, v, z)

df.groupBy($"x", $"y").agg(...)

Is this you want ?

On Fri, Dec 8, 2017 at 11:51 AM, Sandip Mehta <sa...@gmail.com>
wrote:

> Hi,
>
> During my aggregation I end up having following schema.
>
> Row(Row(val1,val2), Row(val1,val2,val3...))
>
> val values = Seq(
>     (Row(10, 11), Row(10, 2, 11)),
>     (Row(10, 11), Row(10, 2, 11)),
>     (Row(20, 11), Row(10, 2, 11))
>   )
>
>
> 1st tuple is used to group the relevant records for aggregation. I have
> used following to create dataset.
>
> val s = StructType(Seq(
>   StructField("x", IntegerType, true),
>   StructField("y", IntegerType, true)
> ))
> val s1 = StructType(Seq(
>   StructField("u", IntegerType, true),
>   StructField("v", IntegerType, true),
>   StructField("z", IntegerType, true)
> ))
>
> val ds = sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s), RowEncoder(s1)))
>
> Is this correct way of representing this?
>
> How do I create dataset and row encoder for such use case for doing
> groupByKey on this?
>
>
>
> Regards
> Sandeep
>