You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Junaid Nasir <jn...@an10.io> on 2017/07/07 21:06:10 UTC

Iterate over grouped df to create new rows/df

Hi everyone,

I am kind of stuck in a problem and was hoping for some pointers or help :)
have tried different things but couldn't achieve the desired results.

I want to *create single row from multiple rows if those rows are
continuous* (based on time i.e if next row's time is within 2 minutes of
previous row's time)

 so what i have is this df (after filtering and grouping)

+--------------------+---+-----+|
time|val|group|+--------------------+---+-----+| 2017-01-01 00:00:00|
41|    1|| 2017-01-01 00:01:00| 42|    1|| 2017-01-01 00:02:00| 41|
1|| 2017-01-01 00:15:00| 50|    1|| 2017-01-01 00:18:00| 49|    1||
2017-01-01 00:19:00| 51|    1|| 2017-01-01 00:20:00| 30|
1|+--------------------+---+-----+

from which I want to compute another df

+--------------------+--------------------+-----+|          start
time|            end
time|group|+--------------------+--------------------+-----+|
2017-01-01 00:00:00| 2017-01-01 00:02:00|    1|| 2017-01-01 00:15:00|
2017-01-01 00:15:00|    1|| 2017-01-01 00:18:00| 2017-01-01 00:20:00|
  1|+--------------------+--------------------+-----+

how do I achieve this? UDAF with withColumn only works for aggregation in
single row.
I am using Spark 2.1.0 on zeppelin with pyspark

Re: Iterate over grouped df to create new rows/df

Posted by ayan guha <gu...@gmail.com>.
Hi

Happy that my solution worked for you. The solution is a sql trick to
identify the boundaries of a session. It has nothing to do with spark
itself.

In the first step it calculates the difference between two consecutive
rows. Then it gives a number fg which is a running number, remains same for
the records within 2 mins of the revious record and increments where the
gap is higher. That is the heart of the solution, achieved by a running sum
with a condition. Then once you have the final grouping, it is trivial to
group on that to get max and min time for that group. Hope that explains.

Best
Ayan

On Mon, 10 Jul 2017 at 9:19 pm, Junaid Nasir <jn...@an10.io> wrote:

> wow thanks ayan, it works like a charm. Also difficult to follow because I
> am new to spark. can you provide some explanation or some guide/so question
> with more details?
> thank you again for your time :)
>
> On Sat, Jul 8, 2017 at 8:06 PM, ayan guha <gu...@gmail.com> wrote:
>
>> Hi
>>
>> Mostly from SO to find overlapping time, adapted to Spark....
>>
>>
>> %pyspark
>> from pyspark.sql import Row
>> d = [Row(t="2017-01-01 00:00:00",v=42,g=1),
>>      Row(t="2017-01-01 00:01:00",v=42,g=1),
>>      Row(t="2017-01-01 00:02:00",v=42,g=1),
>>      Row(t="2017-01-01 00:015:00",v=42,g=1),
>>      Row(t="2017-01-01 00:018:00",v=42,g=1),
>>      Row(t="2017-01-01 00:019:00",v=42,g=1),
>>      Row(t="2017-01-01 00:020:00",v=42,g=1)
>>      ]
>> df = spark.sparkContext.parallelize(d).toDF()
>> df.registerTempTable("base")
>>
>> %pyspark
>> df1 = spark.sql("select g,v,cast(t as timestamp) from base")
>> df1.registerTempTable("trn1")
>> df2 = spark.sql('''select g,fg,min(t) start, max(t) over
>>                     from (select g,v,n,t,dur, *sum(case when dur>2 then
>> 1 else 0 end) over (partition by g order by t) fg *
>>                            from
>>                                (select g,v,n,t,nvl((unix_timestamp(t) -
>> unix_timestamp(n))/60,0) dur
>>                                   from (select g,v,t,lag(t) over
>> (partition by g order by t) n
>>                                           from trn1
>>                                        ) x
>>                                ) y
>>                         ) z group by g,fg
>>               ''')
>> df2.show(truncate=False)
>>
>> +---+---+---------------------+---------------------+
>> |g |fg |start |end |
>> +---+---+---------------------+---------------------+
>> |1 |0 |2017-01-01 00:00:00.0|2017-01-01 00:02:00.0|
>> |1 |1 |2017-01-01 00:15:00.0|2017-01-01 00:15:00.0|
>> |1 |2 |2017-01-01 00:18:00.0|2017-01-01 00:20:00.0|
>> +---+---+---------------------+---------------------+
>>
>> On Sat, Jul 8, 2017 at 7:06 AM, Junaid Nasir <jn...@an10.io> wrote:
>>
>>> Hi everyone,
>>>
>>> I am kind of stuck in a problem and was hoping for some pointers or help
>>> :) have tried different things but couldn't achieve the desired results.
>>>
>>> I want to *create single row from multiple rows if those rows are
>>> continuous* (based on time i.e if next row's time is within 2 minutes
>>> of previous row's time)
>>>
>>>  so what i have is this df (after filtering and grouping)
>>>
>>> +--------------------+---+-----+|                time|val|group|+--------------------+---+-----+| 2017-01-01 00:00:00| 41|    1|| 2017-01-01 00:01:00| 42|    1|| 2017-01-01 00:02:00| 41|    1|| 2017-01-01 00:15:00| 50|    1|| 2017-01-01 00:18:00| 49|    1|| 2017-01-01 00:19:00| 51|    1|| 2017-01-01 00:20:00| 30|    1|+--------------------+---+-----+
>>>
>>> from which I want to compute another df
>>>
>>> +--------------------+--------------------+-----+|          start time|            end time|group|+--------------------+--------------------+-----+| 2017-01-01 00:00:00| 2017-01-01 00:02:00|    1|| 2017-01-01 00:15:00| 2017-01-01 00:15:00|    1|| 2017-01-01 00:18:00| 2017-01-01 00:20:00|    1|+--------------------+--------------------+-----+
>>>
>>> how do I achieve this? UDAF with withColumn only works for aggregation
>>> in single row.
>>> I am using Spark 2.1.0 on zeppelin with pyspark
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
> --
Best Regards,
Ayan Guha