You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Soheil Pourbafrani <so...@gmail.com> on 2021/02/06 22:45:14 UTC

Converting RelationalGroupedDataSet to DataFrame

Hi,

In my problem, I need to group the DataFrame, apply the business logic for
each group and finally emit a new DataFrame based on that. To describe in
detail, there is a device_dataframe which contains the timestamp of when
the device had been turned on (on) and turned off (off).

+---------+----- +--------------------+
|device_id|state |   d_ts             |
+---------+----- +--------------------+
|1        |off   |2020-09-10 16:14:58 |
|1        |on    |2020-09-19 16:14:58 |
|2        |on    |2020-09-20 16:14:58 |
|2        |off   |2020-10-03 16:14:58 |
|4        |on    |2020-09-20 16:14:58 |
|5        |off   |2020-09-20 16:14:58 |
+---------+-----+-------+-------------+

On the other hand, there is a DataFrame containing events information
including its timestamp and its corresponding device.

+-----+---------+--------------------+
|e_id |device_id|       e_ts         |
+-----+---------+--------------------+
|1    |1        |2020-09-20 16:14:58 |
|2    |2        |2020-10-08 09:19:55 |
|3    |4        |2020-11-01 12:15:37 |
|4    |5        |2020-10-08 01:35:08 |
+-----+---------+-------+------------+

The following is a join example of two DataFrames:

+---------+-----+--------------------+------+--------------------+
|device_id|e_id |       e_ts         |state |   d_ts             |
+---------+-----+--------------------+------+--------------------+
|1        |1    |2020-09-20 16:14:58 |off   |2020-09-10 16:14:58 |
|1        |1    |2020-09-20 16:14:58 |on    |2020-09-19 16:14:58 |
|2        |2    |2020-10-08 09:19:55 |on    |2020-09-20 16:14:58 |
|2        |2    |2020-10-08 09:19:55 |off   |2020-10-03 16:14:58 |
|4        |3    |2020-11-01 12:15:37 |on    |2020-09-20 16:14:58 |
|5        |4    |2020-10-08 01:35:08 |off   |2020-09-20 16:14:58 |
+---------+-----+-------+--------------------+------+------------+

What I finally need to find is the events information that happened while
its corresponding device was on. For example in the case of the above
table, the event_id 1 is valid because it happened on 2020-09-20 16:14:58
and its device has been on since 2020-09-19 16:14:58, and the event_id 2 is
not valid as its device was turned down on 2020-10-03 16:14:58 and never
turned on again, and so on which results in the following table:

+---------+-----+-------------------+
|device_id|e_id |       e_ts        |
+---------+-----+-------------------+
|1        |1    |2020-09-20 16:14:58|
|4        |3    |2020-11-01 12:15:37|
+---------+-----+-------------------+

I did the below to group the join table base on the devices:

val grouped = eventDF
      .join(deviceDF, "device_id")
      .groupBy("device_id")

which results in RelationalGroupedDataSet. Now I need to apply the logic to
each group and emit the result DataFrame but I didn't find a solution for
that. I checked the UDAFs but I found it not working in my case.I know how
to solve this using RDD API, but I want to find its *Column API* approach.
Any help or suggestion will be appreciated.
Thanks

Re: Converting RelationalGroupedDataSet to DataFrame

Posted by Stephane Verlet <st...@verlet.name>.
Once you have a RelationalGroupedDataSet , you can use agg() to perform 
group wide operation such max , sum , etc ... or even custom aggregator.
df.groupBy(....).agg(sum(col(...))) 
That will return a DF with your groupBy columns and result of the 
aggregation 
Stephane
Soheil Pourbafrani wrote:
Hi,
In my problem, I need to group the DataFrame, apply the business logic for 
each group and finally emit a new DataFrame based on that. To describe in 
detail, there is a device_dataframe which contains the timestamp of when 
the device had been turned on (on) and turned off (off).+---------+----- 
+--------------------+ |device_id|state | d_ts | +---------+----- 
+--------------------+ |1 |off |2020-09-10 16:14:58 | |1 |on |2020-09-19 
16:14:58 | |2 |on |2020-09-20 16:14:58 | |2 |off |2020-10-03 16:14:58 | |4 
|on |2020-09-20 16:14:58 | |5 |off |2020-09-20 16:14:58 | 
+---------+-----+-------+-------------+ On the other hand, there is a 
DataFrame containing events information including its timestamp and its 
corresponding device.+-----+---------+--------------------+ |e_id 
|device_id| e_ts | +-----+---------+--------------------+ |1 |1 |2020-09-20 
16:14:58 | |2 |2 |2020-10-08 09:19:55 | |3 |4 |2020-11-01 12:15:37 | |4 |5 
|2020-10-08 01:35:08 | +-----+---------+-------+------------+ The following 
is a join example of two 
DataFrames:+---------+-----+--------------------+------+--------------------+ 
|device_id|e_id | e_ts |state | d_ts | 
+---------+-----+--------------------+------+--------------------+ |1 |1 
|2020-09-20 16:14:58 |off |2020-09-10 16:14:58 | |1 |1 |2020-09-20 16:14:58 
|on |2020-09-19 16:14:58 | |2 |2 |2020-10-08 09:19:55 |on |2020-09-20 
16:14:58 | |2 |2 |2020-10-08 09:19:55 |off |2020-10-03 16:14:58 | |4 |3 
|2020-11-01 12:15:37 |on |2020-09-20 16:14:58 | |5 |4 |2020-10-08 01:35:08 
|off |2020-09-20 16:14:58 | 
+---------+-----+-------+--------------------+------+------------+ What I 
finally need to find is the events information that happened while its 
corresponding device was on. For example in the case of the above table, 
the event_id 1 is valid because it happened on 2020-09-20 16:14:58 and its 
device has been on since 2020-09-19 16:14:58, and the event_id 2 is not 
valid as its device was turned down on 2020-10-03 16:14:58 and never turned 
on again, and so on which results in the following 
table:+---------+-----+-------------------+ |device_id|e_id | e_ts | 
+---------+-----+-------------------+ |1 |1 |2020-09-20 16:14:58| |4 |3 
|2020-11-01 12:15:37| +---------+-----+-------------------+ I did the below 
to group the join table base on the devices:val grouped = eventDF 
.join(deviceDF, "device_id") .groupBy("device_id") which results in 
RelationalGroupedDataSet. Now I need to apply the logic to each group and 
emit the result DataFrame but I didn't find a solution for that. I checked 
the UDAFs but I found it not working in my case.I know how to solve this 
using RDD API, but I want to find its Column API approach. Any help or 
suggestion will be appreciated.
Thanks