You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Patrick McCarthy <pm...@dstillery.com> on 2017/10/02 14:01:19 UTC

PySpark - Expand rows into dataframes via function

Hello,

I'm trying to map ARIN registry files into more explicit IP ranges. They
provide a number of IPs in the range (here it's 8192) and a starting IP,
and I'm trying to map it into all the included /24 subnets. For example,

Input:

array(['arin', 'US', 'ipv4', '23.239.160.0', 8192, 20131104.0, 'allocated',

       'ff26920a408f15613096aa7fe0ddaa57'], dtype=object)


Output:

array([['23', '239', '160', 'ff26920a408f15613096aa7fe0ddaa57'],
       ['23', '239', '161', 'ff26920a408f15613096aa7fe0ddaa57'],
       ['23', '239', '162', 'ff26920a408f15613096aa7fe0ddaa57'],

...


I have the input lookup table in a pyspark DF, and a python function
to do the conversion into the mapped output. I think to produce the
full mapping I need a UDTF but this concept doesn't seem to exist in
PySpark. What's the best approach to do this mapping and recombine
into a new DataFrame?


Thanks,

Patrick

Re: PySpark - Expand rows into dataframes via function

Posted by Sathish Kumaran Vairavelu <vs...@gmail.com>.
Flatmap works too.. Explode function is a SQL/Dataframe way of one to many
operation. Both should work. Thanks
On Tue, Oct 3, 2017 at 8:30 AM Patrick McCarthy <pm...@dstillery.com>
wrote:

> Thanks Sathish.
>
> Before you responded, I came up with this solution:
>
> # A function to take in one row and return the expanded ranges:
> def processRow(x):
>
> ...
> return zip(list_of_ip_ranges, list_of_registry_ids)
>
> # and then in spark,
>
> processed_rdds = spark_df_of_input_data.rdd.flatMap(lambda x:
> processRow(x))
>
> processed_df =
> (processed_rdds.toDF().withColumnRenamed('_1','ip').withColumnRenamed('_2','registryid'))
>
> And then after that I split and subset the IP column into what I wanted.
>
> On Mon, Oct 2, 2017 at 7:52 PM, Sathish Kumaran Vairavelu <
> vsathishkumaran@gmail.com> wrote:
>
>> It's possible with array function combined with struct construct. Below
>> is a SQL example
>>
>> select Array(struct(ip1,hashkey), struct(ip2,hashkey))
>> from (select substr(col1,1,2) as ip1, substr(col1,3,3) as ip2, etc,
>> hashkey from object) a
>>
>> If you want dynamic ip ranges; you need to dynamically construct structs
>> based on the range values. Hope this helps.
>>
>>
>> Thanks
>>
>> Sathish
>>
>> On Mon, Oct 2, 2017 at 9:01 AM Patrick McCarthy <pm...@dstillery.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I'm trying to map ARIN registry files into more explicit IP ranges. They
>>> provide a number of IPs in the range (here it's 8192) and a starting IP,
>>> and I'm trying to map it into all the included /24 subnets. For example,
>>>
>>> Input:
>>>
>>> array(['arin', 'US', 'ipv4', '23.239.160.0', 8192, 20131104.0, 'allocated',
>>>
>>>        'ff26920a408f15613096aa7fe0ddaa57'], dtype=object)
>>>
>>>
>>> Output:
>>>
>>> array([['23', '239', '160', 'ff26920a408f15613096aa7fe0ddaa57'],
>>>        ['23', '239', '161', 'ff26920a408f15613096aa7fe0ddaa57'],
>>>        ['23', '239', '162', 'ff26920a408f15613096aa7fe0ddaa57'],
>>>
>>> ...
>>>
>>>
>>> I have the input lookup table in a pyspark DF, and a python function to do the conversion into the mapped output. I think to produce the full mapping I need a UDTF but this concept doesn't seem to exist in PySpark. What's the best approach to do this mapping and recombine into a new DataFrame?
>>>
>>>
>>> Thanks,
>>>
>>> Patrick
>>>
>>>
>

Re: PySpark - Expand rows into dataframes via function

Posted by Patrick McCarthy <pm...@dstillery.com>.
Thanks Sathish.

Before you responded, I came up with this solution:

# A function to take in one row and return the expanded ranges:
def processRow(x):

...
return zip(list_of_ip_ranges, list_of_registry_ids)

# and then in spark,

processed_rdds = spark_df_of_input_data.rdd.flatMap(lambda x: processRow(x))

processed_df =
(processed_rdds.toDF().withColumnRenamed('_1','ip').withColumnRenamed('_2','registryid'))

And then after that I split and subset the IP column into what I wanted.

On Mon, Oct 2, 2017 at 7:52 PM, Sathish Kumaran Vairavelu <
vsathishkumaran@gmail.com> wrote:

> It's possible with array function combined with struct construct. Below is
> a SQL example
>
> select Array(struct(ip1,hashkey), struct(ip2,hashkey))
> from (select substr(col1,1,2) as ip1, substr(col1,3,3) as ip2, etc,
> hashkey from object) a
>
> If you want dynamic ip ranges; you need to dynamically construct structs
> based on the range values. Hope this helps.
>
>
> Thanks
>
> Sathish
>
> On Mon, Oct 2, 2017 at 9:01 AM Patrick McCarthy <pm...@dstillery.com>
> wrote:
>
>> Hello,
>>
>> I'm trying to map ARIN registry files into more explicit IP ranges. They
>> provide a number of IPs in the range (here it's 8192) and a starting IP,
>> and I'm trying to map it into all the included /24 subnets. For example,
>>
>> Input:
>>
>> array(['arin', 'US', 'ipv4', '23.239.160.0', 8192, 20131104.0, 'allocated',
>>
>>        'ff26920a408f15613096aa7fe0ddaa57'], dtype=object)
>>
>>
>> Output:
>>
>> array([['23', '239', '160', 'ff26920a408f15613096aa7fe0ddaa57'],
>>        ['23', '239', '161', 'ff26920a408f15613096aa7fe0ddaa57'],
>>        ['23', '239', '162', 'ff26920a408f15613096aa7fe0ddaa57'],
>>
>> ...
>>
>>
>> I have the input lookup table in a pyspark DF, and a python function to do the conversion into the mapped output. I think to produce the full mapping I need a UDTF but this concept doesn't seem to exist in PySpark. What's the best approach to do this mapping and recombine into a new DataFrame?
>>
>>
>> Thanks,
>>
>> Patrick
>>
>>

Re: PySpark - Expand rows into dataframes via function

Posted by Sathish Kumaran Vairavelu <vs...@gmail.com>.
It's possible with array function combined with struct construct. Below is
a SQL example

select Array(struct(ip1,hashkey), struct(ip2,hashkey))
from (select substr(col1,1,2) as ip1, substr(col1,3,3) as ip2, etc, hashkey
from object) a

If you want dynamic ip ranges; you need to dynamically construct structs
based on the range values. Hope this helps.


Thanks

Sathish

On Mon, Oct 2, 2017 at 9:01 AM Patrick McCarthy <pm...@dstillery.com>
wrote:

> Hello,
>
> I'm trying to map ARIN registry files into more explicit IP ranges. They
> provide a number of IPs in the range (here it's 8192) and a starting IP,
> and I'm trying to map it into all the included /24 subnets. For example,
>
> Input:
>
> array(['arin', 'US', 'ipv4', '23.239.160.0', 8192, 20131104.0, 'allocated',
>
>        'ff26920a408f15613096aa7fe0ddaa57'], dtype=object)
>
>
> Output:
>
> array([['23', '239', '160', 'ff26920a408f15613096aa7fe0ddaa57'],
>        ['23', '239', '161', 'ff26920a408f15613096aa7fe0ddaa57'],
>        ['23', '239', '162', 'ff26920a408f15613096aa7fe0ddaa57'],
>
> ...
>
>
> I have the input lookup table in a pyspark DF, and a python function to do the conversion into the mapped output. I think to produce the full mapping I need a UDTF but this concept doesn't seem to exist in PySpark. What's the best approach to do this mapping and recombine into a new DataFrame?
>
>
> Thanks,
>
> Patrick
>
>