You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Li Jin <ic...@gmail.com> on 2019/02/19 22:30:34 UTC

Thoughts on dataframe cogroup?

Hi,

We have been using Pyspark's groupby().apply() quite a bit and it has been
very helpful in integrating Spark with our existing pandas-heavy libraries.

Recently, we have found more and more cases where groupby().apply() is not
sufficient - In some cases, we want to group two dataframes by the same
key, and apply a function which takes two pd.DataFrame (also returns a
pd.DataFrame) for each key. This feels very much like the "cogroup"
operation in the RDD API.

It would be great to be able to do sth like this: (not actual API, just to
explain the use case):

@pandas_udf(return_schema, ...)
def my_udf(pdf1, pdf2)
     # pdf1 and pdf2 are the subset of the original dataframes that is
associated with a particular key
     result = ... # some code that uses pdf1 and pdf2
     return result

df3  = cogroup(df1, df2, key='some_key').apply(my_udf)

I have searched around the problem and some people have suggested to join
the tables first. However, it's often not the same pattern and hard to get
it to work by using joins.

I wonder what are people's thought on this?

Li

Re: Thoughts on dataframe cogroup?

Posted by Bryan Cutler <cu...@gmail.com>.
Apologies for not leaving feedback yet. I'm a little swamped this week with
the Spark Summit, but this is at the top of my list to get to for next week.

Bryan

On Thu, Apr 18, 2019 at 4:18 AM Chris Martin <ch...@cmartinit.co.uk> wrote:

> Yes, totally agreed with Li here.
>
> For clarity, I'm happy to do the work to implement this, but it would be
> good to get feedback from the community in general and some of the Spark
> committers in particular.
>
> thanks,
>
> Chris
>
> On Wed, Apr 17, 2019 at 9:17 PM Li Jin <ic...@gmail.com> wrote:
>
>> I have left some comments. This looks a good proposal to me.
>>
>> As a heavy pyspark user, this is a pattern that we see over and over
>> again and I think could be pretty high value to other pyspark users as
>> well. The fact that Chris and I come to same ideas sort of verifies my
>> intuition. Also, this isn't really something new, RDD has cogroup function
>> from very early on.
>>
>> With that being said, I'd like to call out again for community's feedback
>> on the proposal.
>>
>> On Mon, Apr 15, 2019 at 4:57 PM Chris Martin <ch...@cmartinit.co.uk>
>> wrote:
>>
>>> Ah sorry- I've updated the link which should give you access.  Can you
>>> try again now?
>>>
>>> thanks,
>>>
>>> Chris
>>>
>>>
>>>
>>> On Mon, Apr 15, 2019 at 9:49 PM Li Jin <ic...@gmail.com> wrote:
>>>
>>>> Hi Chris,
>>>>
>>>> Thanks! The permission to the google doc is maybe not set up properly.
>>>> I cannot view the doc by default.
>>>>
>>>> Li
>>>>
>>>> On Mon, Apr 15, 2019 at 3:58 PM Chris Martin <ch...@cmartinit.co.uk>
>>>> wrote:
>>>>
>>>>> I've updated the jira so that the main body is now inside a google
>>>>> doc.  Anyone should be able to comment- if you want/need write access
>>>>> please drop me a mail and I can add you.
>>>>>
>>>>> Ryan- regarding your specific point regarding why I'm not proposing to
>>>>> add this to the Scala API, I think the main point is that Scala users can
>>>>> already use Cogroup for Datasets.  For Scala this is probably a better
>>>>> solution as (as far as I know) there is no Scala DataFrame library that
>>>>> could be used in place of Pandas for manipulating  local DataFrames. As a
>>>>> result you'd probably be left with dealing with Iterators of Row objects,
>>>>> which almost certainly isn't what you'd want. This is similar to the
>>>>> existing grouped map Pandas Udfs for which there is no equivalent Scala Api.
>>>>>
>>>>> I do think there might be a place for allowing a (Scala) DataSet
>>>>> Cogroup to take some sort of grouping expression as the grouping key  (this
>>>>> would mean that you wouldn't have to marshal the key into a JVM object and
>>>>> could possible lend itself to some catalyst optimisations) but I don't
>>>>> think that this should be done as part of this SPIP.
>>>>>
>>>>> thanks,
>>>>>
>>>>> Chris
>>>>>
>>>>> On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>
>>>>>> I agree, it would be great to have a document to comment on.
>>>>>>
>>>>>> The main thing that stands out right now is that this is only for
>>>>>> PySpark and states that it will not be added to the Scala API. Why not make
>>>>>> this available since most of the work would be done?
>>>>>>
>>>>>> On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ic...@gmail.com> wrote:
>>>>>>
>>>>>>> Thank you Chris, this looks great.
>>>>>>>
>>>>>>> Would you mind share a google doc version of the proposal? I believe
>>>>>>> that's the preferred way of discussing proposals (Other people please
>>>>>>> correct me if I am wrong).
>>>>>>>
>>>>>>> Li
>>>>>>>
>>>>>>> On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>  As promised I’ve raised SPARK-27463 for this.
>>>>>>>>
>>>>>>>> All feedback welcome!
>>>>>>>>
>>>>>>>> Chris
>>>>>>>>
>>>>>>>> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Thanks Bryan and Li, that is much appreciated.  Hopefully should
>>>>>>>> have the SPIP ready in the next couple of days.
>>>>>>>>
>>>>>>>> thanks,
>>>>>>>>
>>>>>>>> Chris
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't
>>>>>>>>> be too difficult to extend the currently functionality to transfer multiple
>>>>>>>>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>>>>>>>>> think it's necessary to include details of the Python worker, we can hash
>>>>>>>>> that out after the SPIP is approved.
>>>>>>>>>
>>>>>>>>> Bryan
>>>>>>>>>
>>>>>>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ic...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Chris, look forward to it.
>>>>>>>>>>
>>>>>>>>>> I think sending multiple dataframes to the python worker requires
>>>>>>>>>> some changes but shouldn't be too difficult. We can probably sth like:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>>>>>>>>
>>>>>>>>>> In:
>>>>>>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>>>>>>>>
>>>>>>>>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>>>>>>>>
>>>>>>>>>> Li
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Just to say, I really do think this is useful and am currently
>>>>>>>>>>> working on a SPIP to formally propose this. One concern I do have, however,
>>>>>>>>>>> is that the current arrow serialization code is tied to passing through a
>>>>>>>>>>> single dataframe as the udf parameter and so any modification to allow
>>>>>>>>>>> multiple dataframes may not be straightforward.  If anyone has any ideas as
>>>>>>>>>>> to how this might be achieved in an elegant manner I’d be happy to hear
>>>>>>>>>>> them!
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> Chris
>>>>>>>>>>>
>>>>>>>>>>> On 26 Feb 2019, at 14:55, Li Jin <ic...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Thank you both for the reply. Chris and I have very similar use
>>>>>>>>>>> cases for cogroup.
>>>>>>>>>>>
>>>>>>>>>>> One of the goals for groupby apply + pandas UDF was to avoid
>>>>>>>>>>> things like collect list and reshaping data between Spark and Pandas.
>>>>>>>>>>> Cogroup feels very similar and can be an extension to the groupby apply +
>>>>>>>>>>> pandas UDF functionality.
>>>>>>>>>>>
>>>>>>>>>>> I wonder if any PMC/committers have any thoughts/opinions on
>>>>>>>>>>> this?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Just to add to this I’ve also implemented my own cogroup
>>>>>>>>>>>> previously and would welcome a cogroup for datafame.
>>>>>>>>>>>>
>>>>>>>>>>>> My specific use case was that I had a large amount of time
>>>>>>>>>>>> series data. Spark has very limited support for time series (specifically
>>>>>>>>>>>> as-of joins), but pandas has good support.
>>>>>>>>>>>>
>>>>>>>>>>>> My solution was to take my two dataframes and perform a group
>>>>>>>>>>>> by and collect list on each. The resulting arrays could be passed into a
>>>>>>>>>>>> udf where they could be marshaled into a couple of pandas dataframes and
>>>>>>>>>>>> processed using pandas excellent time series functionality.
>>>>>>>>>>>>
>>>>>>>>>>>> If cogroup was available natively on dataframes this would have
>>>>>>>>>>>> been a bit nicer. The ideal would have been some pandas udf version of
>>>>>>>>>>>> cogroup that gave me a pandas dataframe for each spark dataframe in the
>>>>>>>>>>>> cogroup!
>>>>>>>>>>>>
>>>>>>>>>>>> Chris
>>>>>>>>>>>>
>>>>>>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <
>>>>>>>>>>>> jonathan.winandy@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> For info, in our team have defined our own cogroup on dataframe
>>>>>>>>>>>> in the past on different projects using different methods (rdd[row] based
>>>>>>>>>>>> or union all collect list based).
>>>>>>>>>>>>
>>>>>>>>>>>> I might be biased, but find the approach very useful in project
>>>>>>>>>>>> to simplify and speed up transformations, and remove a lot of intermediate
>>>>>>>>>>>> stages (distinct + join => just cogroup).
>>>>>>>>>>>>
>>>>>>>>>>>> Plus spark 2.4 introduced a lot of new operator for nested
>>>>>>>>>>>> data. That's a win!
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I am wondering do other people have opinion/use case on
>>>>>>>>>>>>> cogroup?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Alessandro,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean
>>>>>>>>>>>>>> "equality  full outer join" .
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Two issues I see with equity outer join is:
>>>>>>>>>>>>>> (1) equity outer join will give n * m rows for each key (n
>>>>>>>>>>>>>> and m being the corresponding number of rows in df1 and df2 for each key)
>>>>>>>>>>>>>> (2) User needs to do some extra processing to transform n * m
>>>>>>>>>>>>>> back to the desired shape (two sub dataframes with n and m rows)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think full outer join is an inefficient way to implement
>>>>>>>>>>>>>> cogroup. If the end goal is to have two separate dataframes for each key,
>>>>>>>>>>>>>> why joining them first and then unjoin them?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>>>>>>>>>> alessandro.solimando@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>> I fail to see how an equi-join on the key columns is
>>>>>>>>>>>>>>> different than the cogroup you propose.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Now you apply an udf on each iterable, one per key value
>>>>>>>>>>>>>>> (obtained with cogroup).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> You can achieve the same by:
>>>>>>>>>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are
>>>>>>>>>>>>>>> not familiar with them
>>>>>>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>>>>>>>>> that will process each group "in isolation".
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> HTH,
>>>>>>>>>>>>>>> Alessandro
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit
>>>>>>>>>>>>>>>> and it has been very helpful in integrating Spark with our existing
>>>>>>>>>>>>>>>> pandas-heavy libraries.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Recently, we have found more and more cases where
>>>>>>>>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to group two
>>>>>>>>>>>>>>>> dataframes by the same key, and apply a function which takes two
>>>>>>>>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This feels very
>>>>>>>>>>>>>>>> much like the "cogroup" operation in the RDD API.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It would be great to be able to do sth like this: (not
>>>>>>>>>>>>>>>> actual API, just to explain the use case):
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>>>>>>>      # pdf1 and pdf2 are the subset of the original
>>>>>>>>>>>>>>>> dataframes that is associated with a particular key
>>>>>>>>>>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>>>>>>>>>>      return result
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have searched around the problem and some people have
>>>>>>>>>>>>>>>> suggested to join the tables first. However, it's often not the same
>>>>>>>>>>>>>>>> pattern and hard to get it to work by using joins.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Li
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>

Re: Thoughts on dataframe cogroup?

Posted by Chris Martin <ch...@cmartinit.co.uk>.
Yes, totally agreed with Li here.

For clarity, I'm happy to do the work to implement this, but it would be
good to get feedback from the community in general and some of the Spark
committers in particular.

thanks,

Chris

On Wed, Apr 17, 2019 at 9:17 PM Li Jin <ic...@gmail.com> wrote:

> I have left some comments. This looks a good proposal to me.
>
> As a heavy pyspark user, this is a pattern that we see over and over again
> and I think could be pretty high value to other pyspark users as well. The
> fact that Chris and I come to same ideas sort of verifies my intuition.
> Also, this isn't really something new, RDD has cogroup function from very
> early on.
>
> With that being said, I'd like to call out again for community's feedback
> on the proposal.
>
> On Mon, Apr 15, 2019 at 4:57 PM Chris Martin <ch...@cmartinit.co.uk>
> wrote:
>
>> Ah sorry- I've updated the link which should give you access.  Can you
>> try again now?
>>
>> thanks,
>>
>> Chris
>>
>>
>>
>> On Mon, Apr 15, 2019 at 9:49 PM Li Jin <ic...@gmail.com> wrote:
>>
>>> Hi Chris,
>>>
>>> Thanks! The permission to the google doc is maybe not set up properly. I
>>> cannot view the doc by default.
>>>
>>> Li
>>>
>>> On Mon, Apr 15, 2019 at 3:58 PM Chris Martin <ch...@cmartinit.co.uk>
>>> wrote:
>>>
>>>> I've updated the jira so that the main body is now inside a google
>>>> doc.  Anyone should be able to comment- if you want/need write access
>>>> please drop me a mail and I can add you.
>>>>
>>>> Ryan- regarding your specific point regarding why I'm not proposing to
>>>> add this to the Scala API, I think the main point is that Scala users can
>>>> already use Cogroup for Datasets.  For Scala this is probably a better
>>>> solution as (as far as I know) there is no Scala DataFrame library that
>>>> could be used in place of Pandas for manipulating  local DataFrames. As a
>>>> result you'd probably be left with dealing with Iterators of Row objects,
>>>> which almost certainly isn't what you'd want. This is similar to the
>>>> existing grouped map Pandas Udfs for which there is no equivalent Scala Api.
>>>>
>>>> I do think there might be a place for allowing a (Scala) DataSet
>>>> Cogroup to take some sort of grouping expression as the grouping key  (this
>>>> would mean that you wouldn't have to marshal the key into a JVM object and
>>>> could possible lend itself to some catalyst optimisations) but I don't
>>>> think that this should be done as part of this SPIP.
>>>>
>>>> thanks,
>>>>
>>>> Chris
>>>>
>>>> On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>
>>>>> I agree, it would be great to have a document to comment on.
>>>>>
>>>>> The main thing that stands out right now is that this is only for
>>>>> PySpark and states that it will not be added to the Scala API. Why not make
>>>>> this available since most of the work would be done?
>>>>>
>>>>> On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ic...@gmail.com> wrote:
>>>>>
>>>>>> Thank you Chris, this looks great.
>>>>>>
>>>>>> Would you mind share a google doc version of the proposal? I believe
>>>>>> that's the preferred way of discussing proposals (Other people please
>>>>>> correct me if I am wrong).
>>>>>>
>>>>>> Li
>>>>>>
>>>>>> On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>  As promised I’ve raised SPARK-27463 for this.
>>>>>>>
>>>>>>> All feedback welcome!
>>>>>>>
>>>>>>> Chris
>>>>>>>
>>>>>>> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote:
>>>>>>>
>>>>>>> Thanks Bryan and Li, that is much appreciated.  Hopefully should
>>>>>>> have the SPIP ready in the next couple of days.
>>>>>>>
>>>>>>> thanks,
>>>>>>>
>>>>>>> Chris
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't
>>>>>>>> be too difficult to extend the currently functionality to transfer multiple
>>>>>>>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>>>>>>>> think it's necessary to include details of the Python worker, we can hash
>>>>>>>> that out after the SPIP is approved.
>>>>>>>>
>>>>>>>> Bryan
>>>>>>>>
>>>>>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ic...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Chris, look forward to it.
>>>>>>>>>
>>>>>>>>> I think sending multiple dataframes to the python worker requires
>>>>>>>>> some changes but shouldn't be too difficult. We can probably sth like:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>>>>>>>
>>>>>>>>> In:
>>>>>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>>>>>>>
>>>>>>>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>>>>>>>
>>>>>>>>> Li
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Just to say, I really do think this is useful and am currently
>>>>>>>>>> working on a SPIP to formally propose this. One concern I do have, however,
>>>>>>>>>> is that the current arrow serialization code is tied to passing through a
>>>>>>>>>> single dataframe as the udf parameter and so any modification to allow
>>>>>>>>>> multiple dataframes may not be straightforward.  If anyone has any ideas as
>>>>>>>>>> to how this might be achieved in an elegant manner I’d be happy to hear
>>>>>>>>>> them!
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Chris
>>>>>>>>>>
>>>>>>>>>> On 26 Feb 2019, at 14:55, Li Jin <ic...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Thank you both for the reply. Chris and I have very similar use
>>>>>>>>>> cases for cogroup.
>>>>>>>>>>
>>>>>>>>>> One of the goals for groupby apply + pandas UDF was to avoid
>>>>>>>>>> things like collect list and reshaping data between Spark and Pandas.
>>>>>>>>>> Cogroup feels very similar and can be an extension to the groupby apply +
>>>>>>>>>> pandas UDF functionality.
>>>>>>>>>>
>>>>>>>>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>>>>
>>>>>>>>>>> Just to add to this I’ve also implemented my own cogroup
>>>>>>>>>>> previously and would welcome a cogroup for datafame.
>>>>>>>>>>>
>>>>>>>>>>> My specific use case was that I had a large amount of time
>>>>>>>>>>> series data. Spark has very limited support for time series (specifically
>>>>>>>>>>> as-of joins), but pandas has good support.
>>>>>>>>>>>
>>>>>>>>>>> My solution was to take my two dataframes and perform a group by
>>>>>>>>>>> and collect list on each. The resulting arrays could be passed into a udf
>>>>>>>>>>> where they could be marshaled into a couple of pandas dataframes and
>>>>>>>>>>> processed using pandas excellent time series functionality.
>>>>>>>>>>>
>>>>>>>>>>> If cogroup was available natively on dataframes this would have
>>>>>>>>>>> been a bit nicer. The ideal would have been some pandas udf version of
>>>>>>>>>>> cogroup that gave me a pandas dataframe for each spark dataframe in the
>>>>>>>>>>> cogroup!
>>>>>>>>>>>
>>>>>>>>>>> Chris
>>>>>>>>>>>
>>>>>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <
>>>>>>>>>>> jonathan.winandy@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> For info, in our team have defined our own cogroup on dataframe
>>>>>>>>>>> in the past on different projects using different methods (rdd[row] based
>>>>>>>>>>> or union all collect list based).
>>>>>>>>>>>
>>>>>>>>>>> I might be biased, but find the approach very useful in project
>>>>>>>>>>> to simplify and speed up transformations, and remove a lot of intermediate
>>>>>>>>>>> stages (distinct + join => just cogroup).
>>>>>>>>>>>
>>>>>>>>>>> Plus spark 2.4 introduced a lot of new operator for nested data.
>>>>>>>>>>> That's a win!
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Alessandro,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean
>>>>>>>>>>>>> "equality  full outer join" .
>>>>>>>>>>>>>
>>>>>>>>>>>>> Two issues I see with equity outer join is:
>>>>>>>>>>>>> (1) equity outer join will give n * m rows for each key (n and
>>>>>>>>>>>>> m being the corresponding number of rows in df1 and df2 for each key)
>>>>>>>>>>>>> (2) User needs to do some extra processing to transform n * m
>>>>>>>>>>>>> back to the desired shape (two sub dataframes with n and m rows)
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think full outer join is an inefficient way to implement
>>>>>>>>>>>>> cogroup. If the end goal is to have two separate dataframes for each key,
>>>>>>>>>>>>> why joining them first and then unjoin them?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>>>>>>>>> alessandro.solimando@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>> I fail to see how an equi-join on the key columns is
>>>>>>>>>>>>>> different than the cogroup you propose.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Now you apply an udf on each iterable, one per key value
>>>>>>>>>>>>>> (obtained with cogroup).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> You can achieve the same by:
>>>>>>>>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are
>>>>>>>>>>>>>> not familiar with them
>>>>>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>>>>>>>> that will process each group "in isolation".
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> HTH,
>>>>>>>>>>>>>> Alessandro
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit
>>>>>>>>>>>>>>> and it has been very helpful in integrating Spark with our existing
>>>>>>>>>>>>>>> pandas-heavy libraries.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Recently, we have found more and more cases where
>>>>>>>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to group two
>>>>>>>>>>>>>>> dataframes by the same key, and apply a function which takes two
>>>>>>>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This feels very
>>>>>>>>>>>>>>> much like the "cogroup" operation in the RDD API.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It would be great to be able to do sth like this: (not
>>>>>>>>>>>>>>> actual API, just to explain the use case):
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>>>>>>      # pdf1 and pdf2 are the subset of the original
>>>>>>>>>>>>>>> dataframes that is associated with a particular key
>>>>>>>>>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>>>>>>>>>      return result
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have searched around the problem and some people have
>>>>>>>>>>>>>>> suggested to join the tables first. However, it's often not the same
>>>>>>>>>>>>>>> pattern and hard to get it to work by using joins.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Li
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>

Re: Thoughts on dataframe cogroup?

Posted by Li Jin <ic...@gmail.com>.
I have left some comments. This looks a good proposal to me.

As a heavy pyspark user, this is a pattern that we see over and over again
and I think could be pretty high value to other pyspark users as well. The
fact that Chris and I come to same ideas sort of verifies my intuition.
Also, this isn't really something new, RDD has cogroup function from very
early on.

With that being said, I'd like to call out again for community's feedback
on the proposal.

On Mon, Apr 15, 2019 at 4:57 PM Chris Martin <ch...@cmartinit.co.uk> wrote:

> Ah sorry- I've updated the link which should give you access.  Can you try
> again now?
>
> thanks,
>
> Chris
>
>
>
> On Mon, Apr 15, 2019 at 9:49 PM Li Jin <ic...@gmail.com> wrote:
>
>> Hi Chris,
>>
>> Thanks! The permission to the google doc is maybe not set up properly. I
>> cannot view the doc by default.
>>
>> Li
>>
>> On Mon, Apr 15, 2019 at 3:58 PM Chris Martin <ch...@cmartinit.co.uk>
>> wrote:
>>
>>> I've updated the jira so that the main body is now inside a google doc.
>>> Anyone should be able to comment- if you want/need write access please drop
>>> me a mail and I can add you.
>>>
>>> Ryan- regarding your specific point regarding why I'm not proposing to
>>> add this to the Scala API, I think the main point is that Scala users can
>>> already use Cogroup for Datasets.  For Scala this is probably a better
>>> solution as (as far as I know) there is no Scala DataFrame library that
>>> could be used in place of Pandas for manipulating  local DataFrames. As a
>>> result you'd probably be left with dealing with Iterators of Row objects,
>>> which almost certainly isn't what you'd want. This is similar to the
>>> existing grouped map Pandas Udfs for which there is no equivalent Scala Api.
>>>
>>> I do think there might be a place for allowing a (Scala) DataSet Cogroup
>>> to take some sort of grouping expression as the grouping key  (this would
>>> mean that you wouldn't have to marshal the key into a JVM object and could
>>> possible lend itself to some catalyst optimisations) but I don't think that
>>> this should be done as part of this SPIP.
>>>
>>> thanks,
>>>
>>> Chris
>>>
>>> On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> I agree, it would be great to have a document to comment on.
>>>>
>>>> The main thing that stands out right now is that this is only for
>>>> PySpark and states that it will not be added to the Scala API. Why not make
>>>> this available since most of the work would be done?
>>>>
>>>> On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ic...@gmail.com> wrote:
>>>>
>>>>> Thank you Chris, this looks great.
>>>>>
>>>>> Would you mind share a google doc version of the proposal? I believe
>>>>> that's the preferred way of discussing proposals (Other people please
>>>>> correct me if I am wrong).
>>>>>
>>>>> Li
>>>>>
>>>>> On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>  As promised I’ve raised SPARK-27463 for this.
>>>>>>
>>>>>> All feedback welcome!
>>>>>>
>>>>>> Chris
>>>>>>
>>>>>> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote:
>>>>>>
>>>>>> Thanks Bryan and Li, that is much appreciated.  Hopefully should have
>>>>>> the SPIP ready in the next couple of days.
>>>>>>
>>>>>> thanks,
>>>>>>
>>>>>> Chris
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't
>>>>>>> be too difficult to extend the currently functionality to transfer multiple
>>>>>>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>>>>>>> think it's necessary to include details of the Python worker, we can hash
>>>>>>> that out after the SPIP is approved.
>>>>>>>
>>>>>>> Bryan
>>>>>>>
>>>>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ic...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Chris, look forward to it.
>>>>>>>>
>>>>>>>> I think sending multiple dataframes to the python worker requires
>>>>>>>> some changes but shouldn't be too difficult. We can probably sth like:
>>>>>>>>
>>>>>>>>
>>>>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>>>>>>
>>>>>>>> In:
>>>>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>>>>>>
>>>>>>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>>>>>>
>>>>>>>> Li
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Just to say, I really do think this is useful and am currently
>>>>>>>>> working on a SPIP to formally propose this. One concern I do have, however,
>>>>>>>>> is that the current arrow serialization code is tied to passing through a
>>>>>>>>> single dataframe as the udf parameter and so any modification to allow
>>>>>>>>> multiple dataframes may not be straightforward.  If anyone has any ideas as
>>>>>>>>> to how this might be achieved in an elegant manner I’d be happy to hear
>>>>>>>>> them!
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Chris
>>>>>>>>>
>>>>>>>>> On 26 Feb 2019, at 14:55, Li Jin <ic...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Thank you both for the reply. Chris and I have very similar use
>>>>>>>>> cases for cogroup.
>>>>>>>>>
>>>>>>>>> One of the goals for groupby apply + pandas UDF was to avoid
>>>>>>>>> things like collect list and reshaping data between Spark and Pandas.
>>>>>>>>> Cogroup feels very similar and can be an extension to the groupby apply +
>>>>>>>>> pandas UDF functionality.
>>>>>>>>>
>>>>>>>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>>>>>>>
>>>>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>>>
>>>>>>>>>> Just to add to this I’ve also implemented my own cogroup
>>>>>>>>>> previously and would welcome a cogroup for datafame.
>>>>>>>>>>
>>>>>>>>>> My specific use case was that I had a large amount of time series
>>>>>>>>>> data. Spark has very limited support for time series (specifically as-of
>>>>>>>>>> joins), but pandas has good support.
>>>>>>>>>>
>>>>>>>>>> My solution was to take my two dataframes and perform a group by
>>>>>>>>>> and collect list on each. The resulting arrays could be passed into a udf
>>>>>>>>>> where they could be marshaled into a couple of pandas dataframes and
>>>>>>>>>> processed using pandas excellent time series functionality.
>>>>>>>>>>
>>>>>>>>>> If cogroup was available natively on dataframes this would have
>>>>>>>>>> been a bit nicer. The ideal would have been some pandas udf version of
>>>>>>>>>> cogroup that gave me a pandas dataframe for each spark dataframe in the
>>>>>>>>>> cogroup!
>>>>>>>>>>
>>>>>>>>>> Chris
>>>>>>>>>>
>>>>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <
>>>>>>>>>> jonathan.winandy@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> For info, in our team have defined our own cogroup on dataframe
>>>>>>>>>> in the past on different projects using different methods (rdd[row] based
>>>>>>>>>> or union all collect list based).
>>>>>>>>>>
>>>>>>>>>> I might be biased, but find the approach very useful in project
>>>>>>>>>> to simplify and speed up transformations, and remove a lot of intermediate
>>>>>>>>>> stages (distinct + join => just cogroup).
>>>>>>>>>>
>>>>>>>>>> Plus spark 2.4 introduced a lot of new operator for nested data.
>>>>>>>>>> That's a win!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Alessandro,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean
>>>>>>>>>>>> "equality  full outer join" .
>>>>>>>>>>>>
>>>>>>>>>>>> Two issues I see with equity outer join is:
>>>>>>>>>>>> (1) equity outer join will give n * m rows for each key (n and
>>>>>>>>>>>> m being the corresponding number of rows in df1 and df2 for each key)
>>>>>>>>>>>> (2) User needs to do some extra processing to transform n * m
>>>>>>>>>>>> back to the desired shape (two sub dataframes with n and m rows)
>>>>>>>>>>>>
>>>>>>>>>>>> I think full outer join is an inefficient way to implement
>>>>>>>>>>>> cogroup. If the end goal is to have two separate dataframes for each key,
>>>>>>>>>>>> why joining them first and then unjoin them?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>>>>>>>> alessandro.solimando@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>> I fail to see how an equi-join on the key columns is different
>>>>>>>>>>>>> than the cogroup you propose.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>>>>>
>>>>>>>>>>>>> Now you apply an udf on each iterable, one per key value
>>>>>>>>>>>>> (obtained with cogroup).
>>>>>>>>>>>>>
>>>>>>>>>>>>> You can achieve the same by:
>>>>>>>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are
>>>>>>>>>>>>> not familiar with them
>>>>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>>>>>>> that will process each group "in isolation".
>>>>>>>>>>>>>
>>>>>>>>>>>>> HTH,
>>>>>>>>>>>>> Alessandro
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit
>>>>>>>>>>>>>> and it has been very helpful in integrating Spark with our existing
>>>>>>>>>>>>>> pandas-heavy libraries.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Recently, we have found more and more cases where
>>>>>>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to group two
>>>>>>>>>>>>>> dataframes by the same key, and apply a function which takes two
>>>>>>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This feels very
>>>>>>>>>>>>>> much like the "cogroup" operation in the RDD API.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It would be great to be able to do sth like this: (not actual
>>>>>>>>>>>>>> API, just to explain the use case):
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>>>>>      # pdf1 and pdf2 are the subset of the original
>>>>>>>>>>>>>> dataframes that is associated with a particular key
>>>>>>>>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>>>>>>>>      return result
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have searched around the problem and some people have
>>>>>>>>>>>>>> suggested to join the tables first. However, it's often not the same
>>>>>>>>>>>>>> pattern and hard to get it to work by using joins.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Li
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>

Re: Thoughts on dataframe cogroup?

Posted by Chris Martin <ch...@cmartinit.co.uk>.
Ah sorry- I've updated the link which should give you access.  Can you try
again now?

thanks,

Chris



On Mon, Apr 15, 2019 at 9:49 PM Li Jin <ic...@gmail.com> wrote:

> Hi Chris,
>
> Thanks! The permission to the google doc is maybe not set up properly. I
> cannot view the doc by default.
>
> Li
>
> On Mon, Apr 15, 2019 at 3:58 PM Chris Martin <ch...@cmartinit.co.uk>
> wrote:
>
>> I've updated the jira so that the main body is now inside a google doc.
>> Anyone should be able to comment- if you want/need write access please drop
>> me a mail and I can add you.
>>
>> Ryan- regarding your specific point regarding why I'm not proposing to
>> add this to the Scala API, I think the main point is that Scala users can
>> already use Cogroup for Datasets.  For Scala this is probably a better
>> solution as (as far as I know) there is no Scala DataFrame library that
>> could be used in place of Pandas for manipulating  local DataFrames. As a
>> result you'd probably be left with dealing with Iterators of Row objects,
>> which almost certainly isn't what you'd want. This is similar to the
>> existing grouped map Pandas Udfs for which there is no equivalent Scala Api.
>>
>> I do think there might be a place for allowing a (Scala) DataSet Cogroup
>> to take some sort of grouping expression as the grouping key  (this would
>> mean that you wouldn't have to marshal the key into a JVM object and could
>> possible lend itself to some catalyst optimisations) but I don't think that
>> this should be done as part of this SPIP.
>>
>> thanks,
>>
>> Chris
>>
>> On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue <rb...@netflix.com> wrote:
>>
>>> I agree, it would be great to have a document to comment on.
>>>
>>> The main thing that stands out right now is that this is only for
>>> PySpark and states that it will not be added to the Scala API. Why not make
>>> this available since most of the work would be done?
>>>
>>> On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ic...@gmail.com> wrote:
>>>
>>>> Thank you Chris, this looks great.
>>>>
>>>> Would you mind share a google doc version of the proposal? I believe
>>>> that's the preferred way of discussing proposals (Other people please
>>>> correct me if I am wrong).
>>>>
>>>> Li
>>>>
>>>> On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>>  As promised I’ve raised SPARK-27463 for this.
>>>>>
>>>>> All feedback welcome!
>>>>>
>>>>> Chris
>>>>>
>>>>> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote:
>>>>>
>>>>> Thanks Bryan and Li, that is much appreciated.  Hopefully should have
>>>>> the SPIP ready in the next couple of days.
>>>>>
>>>>> thanks,
>>>>>
>>>>> Chris
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cu...@gmail.com> wrote:
>>>>>
>>>>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be
>>>>>> too difficult to extend the currently functionality to transfer multiple
>>>>>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>>>>>> think it's necessary to include details of the Python worker, we can hash
>>>>>> that out after the SPIP is approved.
>>>>>>
>>>>>> Bryan
>>>>>>
>>>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ic...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks Chris, look forward to it.
>>>>>>>
>>>>>>> I think sending multiple dataframes to the python worker requires
>>>>>>> some changes but shouldn't be too difficult. We can probably sth like:
>>>>>>>
>>>>>>>
>>>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>>>>>
>>>>>>> In:
>>>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>>>>>
>>>>>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>>>>>
>>>>>>> Li
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Just to say, I really do think this is useful and am currently
>>>>>>>> working on a SPIP to formally propose this. One concern I do have, however,
>>>>>>>> is that the current arrow serialization code is tied to passing through a
>>>>>>>> single dataframe as the udf parameter and so any modification to allow
>>>>>>>> multiple dataframes may not be straightforward.  If anyone has any ideas as
>>>>>>>> to how this might be achieved in an elegant manner I’d be happy to hear
>>>>>>>> them!
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Chris
>>>>>>>>
>>>>>>>> On 26 Feb 2019, at 14:55, Li Jin <ic...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Thank you both for the reply. Chris and I have very similar use
>>>>>>>> cases for cogroup.
>>>>>>>>
>>>>>>>> One of the goals for groupby apply + pandas UDF was to avoid things
>>>>>>>> like collect list and reshaping data between Spark and Pandas. Cogroup
>>>>>>>> feels very similar and can be an extension to the groupby apply + pandas
>>>>>>>> UDF functionality.
>>>>>>>>
>>>>>>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>>>>>>
>>>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>>
>>>>>>>>> Just to add to this I’ve also implemented my own cogroup
>>>>>>>>> previously and would welcome a cogroup for datafame.
>>>>>>>>>
>>>>>>>>> My specific use case was that I had a large amount of time series
>>>>>>>>> data. Spark has very limited support for time series (specifically as-of
>>>>>>>>> joins), but pandas has good support.
>>>>>>>>>
>>>>>>>>> My solution was to take my two dataframes and perform a group by
>>>>>>>>> and collect list on each. The resulting arrays could be passed into a udf
>>>>>>>>> where they could be marshaled into a couple of pandas dataframes and
>>>>>>>>> processed using pandas excellent time series functionality.
>>>>>>>>>
>>>>>>>>> If cogroup was available natively on dataframes this would have
>>>>>>>>> been a bit nicer. The ideal would have been some pandas udf version of
>>>>>>>>> cogroup that gave me a pandas dataframe for each spark dataframe in the
>>>>>>>>> cogroup!
>>>>>>>>>
>>>>>>>>> Chris
>>>>>>>>>
>>>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <
>>>>>>>>> jonathan.winandy@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> For info, in our team have defined our own cogroup on dataframe in
>>>>>>>>> the past on different projects using different methods (rdd[row] based or
>>>>>>>>> union all collect list based).
>>>>>>>>>
>>>>>>>>> I might be biased, but find the approach very useful in project to
>>>>>>>>> simplify and speed up transformations, and remove a lot of intermediate
>>>>>>>>> stages (distinct + join => just cogroup).
>>>>>>>>>
>>>>>>>>> Plus spark 2.4 introduced a lot of new operator for nested data.
>>>>>>>>> That's a win!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Alessandro,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean
>>>>>>>>>>> "equality  full outer join" .
>>>>>>>>>>>
>>>>>>>>>>> Two issues I see with equity outer join is:
>>>>>>>>>>> (1) equity outer join will give n * m rows for each key (n and m
>>>>>>>>>>> being the corresponding number of rows in df1 and df2 for each key)
>>>>>>>>>>> (2) User needs to do some extra processing to transform n * m
>>>>>>>>>>> back to the desired shape (two sub dataframes with n and m rows)
>>>>>>>>>>>
>>>>>>>>>>> I think full outer join is an inefficient way to implement
>>>>>>>>>>> cogroup. If the end goal is to have two separate dataframes for each key,
>>>>>>>>>>> why joining them first and then unjoin them?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>>>>>>> alessandro.solimando@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello,
>>>>>>>>>>>> I fail to see how an equi-join on the key columns is different
>>>>>>>>>>>> than the cogroup you propose.
>>>>>>>>>>>>
>>>>>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>>>>>
>>>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>>>>
>>>>>>>>>>>> Now you apply an udf on each iterable, one per key value
>>>>>>>>>>>> (obtained with cogroup).
>>>>>>>>>>>>
>>>>>>>>>>>> You can achieve the same by:
>>>>>>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are
>>>>>>>>>>>> not familiar with them
>>>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>>>>>> that will process each group "in isolation".
>>>>>>>>>>>>
>>>>>>>>>>>> HTH,
>>>>>>>>>>>> Alessandro
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and
>>>>>>>>>>>>> it has been very helpful in integrating Spark with our existing
>>>>>>>>>>>>> pandas-heavy libraries.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Recently, we have found more and more cases where
>>>>>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to group two
>>>>>>>>>>>>> dataframes by the same key, and apply a function which takes two
>>>>>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This feels very
>>>>>>>>>>>>> much like the "cogroup" operation in the RDD API.
>>>>>>>>>>>>>
>>>>>>>>>>>>> It would be great to be able to do sth like this: (not actual
>>>>>>>>>>>>> API, just to explain the use case):
>>>>>>>>>>>>>
>>>>>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>>>>      # pdf1 and pdf2 are the subset of the original dataframes
>>>>>>>>>>>>> that is associated with a particular key
>>>>>>>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>>>>>>>      return result
>>>>>>>>>>>>>
>>>>>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have searched around the problem and some people have
>>>>>>>>>>>>> suggested to join the tables first. However, it's often not the same
>>>>>>>>>>>>> pattern and hard to get it to work by using joins.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Li
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>

Re: Thoughts on dataframe cogroup?

Posted by Li Jin <ic...@gmail.com>.
Hi Chris,

Thanks! The permission to the google doc is maybe not set up properly. I
cannot view the doc by default.

Li

On Mon, Apr 15, 2019 at 3:58 PM Chris Martin <ch...@cmartinit.co.uk> wrote:

> I've updated the jira so that the main body is now inside a google doc.
> Anyone should be able to comment- if you want/need write access please drop
> me a mail and I can add you.
>
> Ryan- regarding your specific point regarding why I'm not proposing to add
> this to the Scala API, I think the main point is that Scala users can
> already use Cogroup for Datasets.  For Scala this is probably a better
> solution as (as far as I know) there is no Scala DataFrame library that
> could be used in place of Pandas for manipulating  local DataFrames. As a
> result you'd probably be left with dealing with Iterators of Row objects,
> which almost certainly isn't what you'd want. This is similar to the
> existing grouped map Pandas Udfs for which there is no equivalent Scala Api.
>
> I do think there might be a place for allowing a (Scala) DataSet Cogroup
> to take some sort of grouping expression as the grouping key  (this would
> mean that you wouldn't have to marshal the key into a JVM object and could
> possible lend itself to some catalyst optimisations) but I don't think that
> this should be done as part of this SPIP.
>
> thanks,
>
> Chris
>
> On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue <rb...@netflix.com> wrote:
>
>> I agree, it would be great to have a document to comment on.
>>
>> The main thing that stands out right now is that this is only for PySpark
>> and states that it will not be added to the Scala API. Why not make this
>> available since most of the work would be done?
>>
>> On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ic...@gmail.com> wrote:
>>
>>> Thank you Chris, this looks great.
>>>
>>> Would you mind share a google doc version of the proposal? I believe
>>> that's the preferred way of discussing proposals (Other people please
>>> correct me if I am wrong).
>>>
>>> Li
>>>
>>> On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote:
>>>
>>>> Hi,
>>>>
>>>>  As promised I’ve raised SPARK-27463 for this.
>>>>
>>>> All feedback welcome!
>>>>
>>>> Chris
>>>>
>>>> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote:
>>>>
>>>> Thanks Bryan and Li, that is much appreciated.  Hopefully should have
>>>> the SPIP ready in the next couple of days.
>>>>
>>>> thanks,
>>>>
>>>> Chris
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cu...@gmail.com> wrote:
>>>>
>>>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be
>>>>> too difficult to extend the currently functionality to transfer multiple
>>>>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>>>>> think it's necessary to include details of the Python worker, we can hash
>>>>> that out after the SPIP is approved.
>>>>>
>>>>> Bryan
>>>>>
>>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ic...@gmail.com> wrote:
>>>>>
>>>>>> Thanks Chris, look forward to it.
>>>>>>
>>>>>> I think sending multiple dataframes to the python worker requires
>>>>>> some changes but shouldn't be too difficult. We can probably sth like:
>>>>>>
>>>>>>
>>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>>>>
>>>>>> In:
>>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>>>>
>>>>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>>>>
>>>>>> Li
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Just to say, I really do think this is useful and am currently
>>>>>>> working on a SPIP to formally propose this. One concern I do have, however,
>>>>>>> is that the current arrow serialization code is tied to passing through a
>>>>>>> single dataframe as the udf parameter and so any modification to allow
>>>>>>> multiple dataframes may not be straightforward.  If anyone has any ideas as
>>>>>>> to how this might be achieved in an elegant manner I’d be happy to hear
>>>>>>> them!
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Chris
>>>>>>>
>>>>>>> On 26 Feb 2019, at 14:55, Li Jin <ic...@gmail.com> wrote:
>>>>>>>
>>>>>>> Thank you both for the reply. Chris and I have very similar use
>>>>>>> cases for cogroup.
>>>>>>>
>>>>>>> One of the goals for groupby apply + pandas UDF was to avoid things
>>>>>>> like collect list and reshaping data between Spark and Pandas. Cogroup
>>>>>>> feels very similar and can be an extension to the groupby apply + pandas
>>>>>>> UDF functionality.
>>>>>>>
>>>>>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>>>>>
>>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>
>>>>>>>> Just to add to this I’ve also implemented my own cogroup previously
>>>>>>>> and would welcome a cogroup for datafame.
>>>>>>>>
>>>>>>>> My specific use case was that I had a large amount of time series
>>>>>>>> data. Spark has very limited support for time series (specifically as-of
>>>>>>>> joins), but pandas has good support.
>>>>>>>>
>>>>>>>> My solution was to take my two dataframes and perform a group by
>>>>>>>> and collect list on each. The resulting arrays could be passed into a udf
>>>>>>>> where they could be marshaled into a couple of pandas dataframes and
>>>>>>>> processed using pandas excellent time series functionality.
>>>>>>>>
>>>>>>>> If cogroup was available natively on dataframes this would have
>>>>>>>> been a bit nicer. The ideal would have been some pandas udf version of
>>>>>>>> cogroup that gave me a pandas dataframe for each spark dataframe in the
>>>>>>>> cogroup!
>>>>>>>>
>>>>>>>> Chris
>>>>>>>>
>>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <
>>>>>>>> jonathan.winandy@gmail.com> wrote:
>>>>>>>>
>>>>>>>> For info, in our team have defined our own cogroup on dataframe in
>>>>>>>> the past on different projects using different methods (rdd[row] based or
>>>>>>>> union all collect list based).
>>>>>>>>
>>>>>>>> I might be biased, but find the approach very useful in project to
>>>>>>>> simplify and speed up transformations, and remove a lot of intermediate
>>>>>>>> stages (distinct + join => just cogroup).
>>>>>>>>
>>>>>>>> Plus spark 2.4 introduced a lot of new operator for nested data.
>>>>>>>> That's a win!
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>>>>>
>>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Alessandro,
>>>>>>>>>>
>>>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean
>>>>>>>>>> "equality  full outer join" .
>>>>>>>>>>
>>>>>>>>>> Two issues I see with equity outer join is:
>>>>>>>>>> (1) equity outer join will give n * m rows for each key (n and m
>>>>>>>>>> being the corresponding number of rows in df1 and df2 for each key)
>>>>>>>>>> (2) User needs to do some extra processing to transform n * m
>>>>>>>>>> back to the desired shape (two sub dataframes with n and m rows)
>>>>>>>>>>
>>>>>>>>>> I think full outer join is an inefficient way to implement
>>>>>>>>>> cogroup. If the end goal is to have two separate dataframes for each key,
>>>>>>>>>> why joining them first and then unjoin them?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>>>>>> alessandro.solimando@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello,
>>>>>>>>>>> I fail to see how an equi-join on the key columns is different
>>>>>>>>>>> than the cogroup you propose.
>>>>>>>>>>>
>>>>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>>>>
>>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>>>
>>>>>>>>>>> Now you apply an udf on each iterable, one per key value
>>>>>>>>>>> (obtained with cogroup).
>>>>>>>>>>>
>>>>>>>>>>> You can achieve the same by:
>>>>>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are not
>>>>>>>>>>> familiar with them
>>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>>>>> that will process each group "in isolation".
>>>>>>>>>>>
>>>>>>>>>>> HTH,
>>>>>>>>>>> Alessandro
>>>>>>>>>>>
>>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and
>>>>>>>>>>>> it has been very helpful in integrating Spark with our existing
>>>>>>>>>>>> pandas-heavy libraries.
>>>>>>>>>>>>
>>>>>>>>>>>> Recently, we have found more and more cases where
>>>>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to group two
>>>>>>>>>>>> dataframes by the same key, and apply a function which takes two
>>>>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This feels very
>>>>>>>>>>>> much like the "cogroup" operation in the RDD API.
>>>>>>>>>>>>
>>>>>>>>>>>> It would be great to be able to do sth like this: (not actual
>>>>>>>>>>>> API, just to explain the use case):
>>>>>>>>>>>>
>>>>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>>>      # pdf1 and pdf2 are the subset of the original dataframes
>>>>>>>>>>>> that is associated with a particular key
>>>>>>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>>>>>>      return result
>>>>>>>>>>>>
>>>>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>>>>
>>>>>>>>>>>> I have searched around the problem and some people have
>>>>>>>>>>>> suggested to join the tables first. However, it's often not the same
>>>>>>>>>>>> pattern and hard to get it to work by using joins.
>>>>>>>>>>>>
>>>>>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>>>>>
>>>>>>>>>>>> Li
>>>>>>>>>>>>
>>>>>>>>>>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

Re: Thoughts on dataframe cogroup?

Posted by Chris Martin <ch...@cmartinit.co.uk>.
I've updated the jira so that the main body is now inside a google doc.
Anyone should be able to comment- if you want/need write access please drop
me a mail and I can add you.

Ryan- regarding your specific point regarding why I'm not proposing to add
this to the Scala API, I think the main point is that Scala users can
already use Cogroup for Datasets.  For Scala this is probably a better
solution as (as far as I know) there is no Scala DataFrame library that
could be used in place of Pandas for manipulating  local DataFrames. As a
result you'd probably be left with dealing with Iterators of Row objects,
which almost certainly isn't what you'd want. This is similar to the
existing grouped map Pandas Udfs for which there is no equivalent Scala Api.

I do think there might be a place for allowing a (Scala) DataSet Cogroup to
take some sort of grouping expression as the grouping key  (this would mean
that you wouldn't have to marshal the key into a JVM object and could
possible lend itself to some catalyst optimisations) but I don't think that
this should be done as part of this SPIP.

thanks,

Chris

On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue <rb...@netflix.com> wrote:

> I agree, it would be great to have a document to comment on.
>
> The main thing that stands out right now is that this is only for PySpark
> and states that it will not be added to the Scala API. Why not make this
> available since most of the work would be done?
>
> On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ic...@gmail.com> wrote:
>
>> Thank you Chris, this looks great.
>>
>> Would you mind share a google doc version of the proposal? I believe
>> that's the preferred way of discussing proposals (Other people please
>> correct me if I am wrong).
>>
>> Li
>>
>> On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote:
>>
>>> Hi,
>>>
>>>  As promised I’ve raised SPARK-27463 for this.
>>>
>>> All feedback welcome!
>>>
>>> Chris
>>>
>>> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote:
>>>
>>> Thanks Bryan and Li, that is much appreciated.  Hopefully should have
>>> the SPIP ready in the next couple of days.
>>>
>>> thanks,
>>>
>>> Chris
>>>
>>>
>>>
>>>
>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cu...@gmail.com> wrote:
>>>
>>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be
>>>> too difficult to extend the currently functionality to transfer multiple
>>>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>>>> think it's necessary to include details of the Python worker, we can hash
>>>> that out after the SPIP is approved.
>>>>
>>>> Bryan
>>>>
>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ic...@gmail.com> wrote:
>>>>
>>>>> Thanks Chris, look forward to it.
>>>>>
>>>>> I think sending multiple dataframes to the python worker requires some
>>>>> changes but shouldn't be too difficult. We can probably sth like:
>>>>>
>>>>>
>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>>>
>>>>> In:
>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>>>
>>>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>>>
>>>>> Li
>>>>>
>>>>>
>>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Just to say, I really do think this is useful and am currently
>>>>>> working on a SPIP to formally propose this. One concern I do have, however,
>>>>>> is that the current arrow serialization code is tied to passing through a
>>>>>> single dataframe as the udf parameter and so any modification to allow
>>>>>> multiple dataframes may not be straightforward.  If anyone has any ideas as
>>>>>> to how this might be achieved in an elegant manner I’d be happy to hear
>>>>>> them!
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Chris
>>>>>>
>>>>>> On 26 Feb 2019, at 14:55, Li Jin <ic...@gmail.com> wrote:
>>>>>>
>>>>>> Thank you both for the reply. Chris and I have very similar use cases
>>>>>> for cogroup.
>>>>>>
>>>>>> One of the goals for groupby apply + pandas UDF was to avoid things
>>>>>> like collect list and reshaping data between Spark and Pandas. Cogroup
>>>>>> feels very similar and can be an extension to the groupby apply + pandas
>>>>>> UDF functionality.
>>>>>>
>>>>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>>>>
>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>
>>>>>>> Just to add to this I’ve also implemented my own cogroup previously
>>>>>>> and would welcome a cogroup for datafame.
>>>>>>>
>>>>>>> My specific use case was that I had a large amount of time series
>>>>>>> data. Spark has very limited support for time series (specifically as-of
>>>>>>> joins), but pandas has good support.
>>>>>>>
>>>>>>> My solution was to take my two dataframes and perform a group by and
>>>>>>> collect list on each. The resulting arrays could be passed into a udf where
>>>>>>> they could be marshaled into a couple of pandas dataframes and processed
>>>>>>> using pandas excellent time series functionality.
>>>>>>>
>>>>>>> If cogroup was available natively on dataframes this would have been
>>>>>>> a bit nicer. The ideal would have been some pandas udf version of cogroup
>>>>>>> that gave me a pandas dataframe for each spark dataframe in the cogroup!
>>>>>>>
>>>>>>> Chris
>>>>>>>
>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <
>>>>>>> jonathan.winandy@gmail.com> wrote:
>>>>>>>
>>>>>>> For info, in our team have defined our own cogroup on dataframe in
>>>>>>> the past on different projects using different methods (rdd[row] based or
>>>>>>> union all collect list based).
>>>>>>>
>>>>>>> I might be biased, but find the approach very useful in project to
>>>>>>> simplify and speed up transformations, and remove a lot of intermediate
>>>>>>> stages (distinct + join => just cogroup).
>>>>>>>
>>>>>>> Plus spark 2.4 introduced a lot of new operator for nested data.
>>>>>>> That's a win!
>>>>>>>
>>>>>>>
>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>>>>
>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Alessandro,
>>>>>>>>>
>>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean "equality
>>>>>>>>> full outer join" .
>>>>>>>>>
>>>>>>>>> Two issues I see with equity outer join is:
>>>>>>>>> (1) equity outer join will give n * m rows for each key (n and m
>>>>>>>>> being the corresponding number of rows in df1 and df2 for each key)
>>>>>>>>> (2) User needs to do some extra processing to transform n * m back
>>>>>>>>> to the desired shape (two sub dataframes with n and m rows)
>>>>>>>>>
>>>>>>>>> I think full outer join is an inefficient way to implement
>>>>>>>>> cogroup. If the end goal is to have two separate dataframes for each key,
>>>>>>>>> why joining them first and then unjoin them?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>>>>> alessandro.solimando@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>> I fail to see how an equi-join on the key columns is different
>>>>>>>>>> than the cogroup you propose.
>>>>>>>>>>
>>>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>>>
>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>>
>>>>>>>>>> Now you apply an udf on each iterable, one per key value
>>>>>>>>>> (obtained with cogroup).
>>>>>>>>>>
>>>>>>>>>> You can achieve the same by:
>>>>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are not
>>>>>>>>>> familiar with them
>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>>>> that will process each group "in isolation".
>>>>>>>>>>
>>>>>>>>>> HTH,
>>>>>>>>>> Alessandro
>>>>>>>>>>
>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and
>>>>>>>>>>> it has been very helpful in integrating Spark with our existing
>>>>>>>>>>> pandas-heavy libraries.
>>>>>>>>>>>
>>>>>>>>>>> Recently, we have found more and more cases where
>>>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to group two
>>>>>>>>>>> dataframes by the same key, and apply a function which takes two
>>>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This feels very
>>>>>>>>>>> much like the "cogroup" operation in the RDD API.
>>>>>>>>>>>
>>>>>>>>>>> It would be great to be able to do sth like this: (not actual
>>>>>>>>>>> API, just to explain the use case):
>>>>>>>>>>>
>>>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>>      # pdf1 and pdf2 are the subset of the original dataframes
>>>>>>>>>>> that is associated with a particular key
>>>>>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>>>>>      return result
>>>>>>>>>>>
>>>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>>>
>>>>>>>>>>> I have searched around the problem and some people have
>>>>>>>>>>> suggested to join the tables first. However, it's often not the same
>>>>>>>>>>> pattern and hard to get it to work by using joins.
>>>>>>>>>>>
>>>>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>>>>
>>>>>>>>>>> Li
>>>>>>>>>>>
>>>>>>>>>>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Thoughts on dataframe cogroup?

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
I agree, it would be great to have a document to comment on.

The main thing that stands out right now is that this is only for PySpark
and states that it will not be added to the Scala API. Why not make this
available since most of the work would be done?

On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ic...@gmail.com> wrote:

> Thank you Chris, this looks great.
>
> Would you mind share a google doc version of the proposal? I believe
> that's the preferred way of discussing proposals (Other people please
> correct me if I am wrong).
>
> Li
>
> On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote:
>
>> Hi,
>>
>>  As promised I’ve raised SPARK-27463 for this.
>>
>> All feedback welcome!
>>
>> Chris
>>
>> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote:
>>
>> Thanks Bryan and Li, that is much appreciated.  Hopefully should have the
>> SPIP ready in the next couple of days.
>>
>> thanks,
>>
>> Chris
>>
>>
>>
>>
>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cu...@gmail.com> wrote:
>>
>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be
>>> too difficult to extend the currently functionality to transfer multiple
>>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>>> think it's necessary to include details of the Python worker, we can hash
>>> that out after the SPIP is approved.
>>>
>>> Bryan
>>>
>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ic...@gmail.com> wrote:
>>>
>>>> Thanks Chris, look forward to it.
>>>>
>>>> I think sending multiple dataframes to the python worker requires some
>>>> changes but shouldn't be too difficult. We can probably sth like:
>>>>
>>>>
>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>>
>>>> In:
>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>>
>>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>>
>>>> Li
>>>>
>>>>
>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Just to say, I really do think this is useful and am currently working
>>>>> on a SPIP to formally propose this. One concern I do have, however, is that
>>>>> the current arrow serialization code is tied to passing through a single
>>>>> dataframe as the udf parameter and so any modification to allow multiple
>>>>> dataframes may not be straightforward.  If anyone has any ideas as to how
>>>>> this might be achieved in an elegant manner I’d be happy to hear them!
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Chris
>>>>>
>>>>> On 26 Feb 2019, at 14:55, Li Jin <ic...@gmail.com> wrote:
>>>>>
>>>>> Thank you both for the reply. Chris and I have very similar use cases
>>>>> for cogroup.
>>>>>
>>>>> One of the goals for groupby apply + pandas UDF was to avoid things
>>>>> like collect list and reshaping data between Spark and Pandas. Cogroup
>>>>> feels very similar and can be an extension to the groupby apply + pandas
>>>>> UDF functionality.
>>>>>
>>>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>>>
>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>>>
>>>>>> Just to add to this I’ve also implemented my own cogroup previously
>>>>>> and would welcome a cogroup for datafame.
>>>>>>
>>>>>> My specific use case was that I had a large amount of time series
>>>>>> data. Spark has very limited support for time series (specifically as-of
>>>>>> joins), but pandas has good support.
>>>>>>
>>>>>> My solution was to take my two dataframes and perform a group by and
>>>>>> collect list on each. The resulting arrays could be passed into a udf where
>>>>>> they could be marshaled into a couple of pandas dataframes and processed
>>>>>> using pandas excellent time series functionality.
>>>>>>
>>>>>> If cogroup was available natively on dataframes this would have been
>>>>>> a bit nicer. The ideal would have been some pandas udf version of cogroup
>>>>>> that gave me a pandas dataframe for each spark dataframe in the cogroup!
>>>>>>
>>>>>> Chris
>>>>>>
>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <
>>>>>> jonathan.winandy@gmail.com> wrote:
>>>>>>
>>>>>> For info, in our team have defined our own cogroup on dataframe in
>>>>>> the past on different projects using different methods (rdd[row] based or
>>>>>> union all collect list based).
>>>>>>
>>>>>> I might be biased, but find the approach very useful in project to
>>>>>> simplify and speed up transformations, and remove a lot of intermediate
>>>>>> stages (distinct + join => just cogroup).
>>>>>>
>>>>>> Plus spark 2.4 introduced a lot of new operator for nested data.
>>>>>> That's a win!
>>>>>>
>>>>>>
>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com> wrote:
>>>>>>
>>>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>>>
>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Alessandro,
>>>>>>>>
>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean "equality
>>>>>>>> full outer join" .
>>>>>>>>
>>>>>>>> Two issues I see with equity outer join is:
>>>>>>>> (1) equity outer join will give n * m rows for each key (n and m
>>>>>>>> being the corresponding number of rows in df1 and df2 for each key)
>>>>>>>> (2) User needs to do some extra processing to transform n * m back
>>>>>>>> to the desired shape (two sub dataframes with n and m rows)
>>>>>>>>
>>>>>>>> I think full outer join is an inefficient way to implement cogroup.
>>>>>>>> If the end goal is to have two separate dataframes for each key, why
>>>>>>>> joining them first and then unjoin them?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>>>> alessandro.solimando@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>> I fail to see how an equi-join on the key columns is different
>>>>>>>>> than the cogroup you propose.
>>>>>>>>>
>>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>>
>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>
>>>>>>>>> Now you apply an udf on each iterable, one per key value (obtained
>>>>>>>>> with cogroup).
>>>>>>>>>
>>>>>>>>> You can achieve the same by:
>>>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are not
>>>>>>>>> familiar with them
>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>>> that will process each group "in isolation".
>>>>>>>>>
>>>>>>>>> HTH,
>>>>>>>>> Alessandro
>>>>>>>>>
>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and it
>>>>>>>>>> has been very helpful in integrating Spark with our existing pandas-heavy
>>>>>>>>>> libraries.
>>>>>>>>>>
>>>>>>>>>> Recently, we have found more and more cases where
>>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to group two
>>>>>>>>>> dataframes by the same key, and apply a function which takes two
>>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This feels very
>>>>>>>>>> much like the "cogroup" operation in the RDD API.
>>>>>>>>>>
>>>>>>>>>> It would be great to be able to do sth like this: (not actual
>>>>>>>>>> API, just to explain the use case):
>>>>>>>>>>
>>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>      # pdf1 and pdf2 are the subset of the original dataframes
>>>>>>>>>> that is associated with a particular key
>>>>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>>>>      return result
>>>>>>>>>>
>>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>>
>>>>>>>>>> I have searched around the problem and some people have suggested
>>>>>>>>>> to join the tables first. However, it's often not the same pattern and hard
>>>>>>>>>> to get it to work by using joins.
>>>>>>>>>>
>>>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>>>
>>>>>>>>>> Li
>>>>>>>>>>
>>>>>>>>>>

-- 
Ryan Blue
Software Engineer
Netflix

Re: Thoughts on dataframe cogroup?

Posted by Li Jin <ic...@gmail.com>.
Thank you Chris, this looks great.

Would you mind share a google doc version of the proposal? I believe that's
the preferred way of discussing proposals (Other people please correct me
if I am wrong).

Li

On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote:

> Hi,
>
>  As promised I’ve raised SPARK-27463 for this.
>
> All feedback welcome!
>
> Chris
>
> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote:
>
> Thanks Bryan and Li, that is much appreciated.  Hopefully should have the
> SPIP ready in the next couple of days.
>
> thanks,
>
> Chris
>
>
>
>
> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cu...@gmail.com> wrote:
>
>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be too
>> difficult to extend the currently functionality to transfer multiple
>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>> think it's necessary to include details of the Python worker, we can hash
>> that out after the SPIP is approved.
>>
>> Bryan
>>
>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ic...@gmail.com> wrote:
>>
>>> Thanks Chris, look forward to it.
>>>
>>> I think sending multiple dataframes to the python worker requires some
>>> changes but shouldn't be too difficult. We can probably sth like:
>>>
>>>
>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>
>>> In:
>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>
>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>
>>> Li
>>>
>>>
>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>>
>>>> Hi,
>>>>
>>>> Just to say, I really do think this is useful and am currently working
>>>> on a SPIP to formally propose this. One concern I do have, however, is that
>>>> the current arrow serialization code is tied to passing through a single
>>>> dataframe as the udf parameter and so any modification to allow multiple
>>>> dataframes may not be straightforward.  If anyone has any ideas as to how
>>>> this might be achieved in an elegant manner I’d be happy to hear them!
>>>>
>>>> Thanks,
>>>>
>>>> Chris
>>>>
>>>> On 26 Feb 2019, at 14:55, Li Jin <ic...@gmail.com> wrote:
>>>>
>>>> Thank you both for the reply. Chris and I have very similar use cases
>>>> for cogroup.
>>>>
>>>> One of the goals for groupby apply + pandas UDF was to avoid things
>>>> like collect list and reshaping data between Spark and Pandas. Cogroup
>>>> feels very similar and can be an extension to the groupby apply + pandas
>>>> UDF functionality.
>>>>
>>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>>
>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>>
>>>>> Just to add to this I’ve also implemented my own cogroup previously
>>>>> and would welcome a cogroup for datafame.
>>>>>
>>>>> My specific use case was that I had a large amount of time series
>>>>> data. Spark has very limited support for time series (specifically as-of
>>>>> joins), but pandas has good support.
>>>>>
>>>>> My solution was to take my two dataframes and perform a group by and
>>>>> collect list on each. The resulting arrays could be passed into a udf where
>>>>> they could be marshaled into a couple of pandas dataframes and processed
>>>>> using pandas excellent time series functionality.
>>>>>
>>>>> If cogroup was available natively on dataframes this would have been a
>>>>> bit nicer. The ideal would have been some pandas udf version of cogroup
>>>>> that gave me a pandas dataframe for each spark dataframe in the cogroup!
>>>>>
>>>>> Chris
>>>>>
>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> For info, in our team have defined our own cogroup on dataframe in the
>>>>> past on different projects using different methods (rdd[row] based or union
>>>>> all collect list based).
>>>>>
>>>>> I might be biased, but find the approach very useful in project to
>>>>> simplify and speed up transformations, and remove a lot of intermediate
>>>>> stages (distinct + join => just cogroup).
>>>>>
>>>>> Plus spark 2.4 introduced a lot of new operator for nested data.
>>>>> That's a win!
>>>>>
>>>>>
>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com> wrote:
>>>>>
>>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>>
>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com> wrote:
>>>>>>
>>>>>>> Alessandro,
>>>>>>>
>>>>>>> Thanks for the reply. I assume by "equi-join", you mean "equality
>>>>>>> full outer join" .
>>>>>>>
>>>>>>> Two issues I see with equity outer join is:
>>>>>>> (1) equity outer join will give n * m rows for each key (n and m
>>>>>>> being the corresponding number of rows in df1 and df2 for each key)
>>>>>>> (2) User needs to do some extra processing to transform n * m back
>>>>>>> to the desired shape (two sub dataframes with n and m rows)
>>>>>>>
>>>>>>> I think full outer join is an inefficient way to implement cogroup.
>>>>>>> If the end goal is to have two separate dataframes for each key, why
>>>>>>> joining them first and then unjoin them?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>>> alessandro.solimando@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>> I fail to see how an equi-join on the key columns is different than
>>>>>>>> the cogroup you propose.
>>>>>>>>
>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>
>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>
>>>>>>>> Now you apply an udf on each iterable, one per key value (obtained
>>>>>>>> with cogroup).
>>>>>>>>
>>>>>>>> You can achieve the same by:
>>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>>> 2) apply "groupby" on such key
>>>>>>>> 3) finally apply a udaf (you can have a look here if you are not
>>>>>>>> familiar with them
>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>> that will process each group "in isolation".
>>>>>>>>
>>>>>>>> HTH,
>>>>>>>> Alessandro
>>>>>>>>
>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and it
>>>>>>>>> has been very helpful in integrating Spark with our existing pandas-heavy
>>>>>>>>> libraries.
>>>>>>>>>
>>>>>>>>> Recently, we have found more and more cases where
>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to group two
>>>>>>>>> dataframes by the same key, and apply a function which takes two
>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This feels very
>>>>>>>>> much like the "cogroup" operation in the RDD API.
>>>>>>>>>
>>>>>>>>> It would be great to be able to do sth like this: (not actual API,
>>>>>>>>> just to explain the use case):
>>>>>>>>>
>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>      # pdf1 and pdf2 are the subset of the original dataframes
>>>>>>>>> that is associated with a particular key
>>>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>>>      return result
>>>>>>>>>
>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>
>>>>>>>>> I have searched around the problem and some people have suggested
>>>>>>>>> to join the tables first. However, it's often not the same pattern and hard
>>>>>>>>> to get it to work by using joins.
>>>>>>>>>
>>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>>
>>>>>>>>> Li
>>>>>>>>>
>>>>>>>>>

Re: Thoughts on dataframe cogroup?

Posted by ch...@cmartinit.co.uk.
Hi,

 As promised I’ve raised SPARK-27463 for this.

All feedback welcome!

Chris 

> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote:
> 
> Thanks Bryan and Li, that is much appreciated.  Hopefully should have the SPIP ready in the next couple of days.
> 
> thanks,
> 
> Chris
> 
> 
> 
> 
>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cu...@gmail.com> wrote:
>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be too difficult to extend the currently functionality to transfer multiple DataFrames.  For the SPIP, I would keep it more high-level and I don't think it's necessary to include details of the Python worker, we can hash that out after the SPIP is approved.
>> 
>> Bryan
>> 
>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ic...@gmail.com> wrote:
>>> Thanks Chris, look forward to it.
>>> 
>>> I think sending multiple dataframes to the python worker requires some changes but shouldn't be too difficult. We can probably sth like:
>>> 
>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>> 
>>> In: https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>> 
>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>> 
>>> Li
>>> 
>>> 
>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>>> Hi,
>>>> 
>>>> Just to say, I really do think this is useful and am currently working on a SPIP to formally propose this. One concern I do have, however, is that the current arrow serialization code is tied to passing through a single dataframe as the udf parameter and so any modification to allow multiple dataframes may not be straightforward.  If anyone has any ideas as to how this might be achieved in an elegant manner I’d be happy to hear them!
>>>> 
>>>> Thanks,
>>>> 
>>>> Chris 
>>>> 
>>>>> On 26 Feb 2019, at 14:55, Li Jin <ic...@gmail.com> wrote:
>>>>> 
>>>>> Thank you both for the reply. Chris and I have very similar use cases for cogroup. 
>>>>> 
>>>>> One of the goals for groupby apply + pandas UDF was to avoid things like collect list and reshaping data between Spark and Pandas. Cogroup feels very similar and can be an extension to the groupby apply + pandas UDF functionality.
>>>>> 
>>>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>>> 
>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>>>> Just to add to this I’ve also implemented my own cogroup previously and would welcome a cogroup for datafame.
>>>>>> 
>>>>>> My specific use case was that I had a large amount of time series data. Spark has very limited support for time series (specifically as-of joins), but pandas has good support.
>>>>>> 
>>>>>> My solution was to take my two dataframes and perform a group by and collect list on each. The resulting arrays could be passed into a udf where they could be marshaled into a couple of pandas dataframes and processed using pandas excellent time series functionality.
>>>>>> 
>>>>>> If cogroup was available natively on dataframes this would have been a bit nicer. The ideal would have been some pandas udf version of cogroup that gave me a pandas dataframe for each spark dataframe in the cogroup!
>>>>>> 
>>>>>> Chris 
>>>>>> 
>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <jo...@gmail.com> wrote:
>>>>>>> 
>>>>>>> For info, in our team have defined our own cogroup on dataframe in the past on different projects using different methods (rdd[row] based or union all collect list based). 
>>>>>>> 
>>>>>>> I might be biased, but find the approach very useful in project to simplify and speed up transformations, and remove a lot of intermediate stages (distinct + join => just cogroup). 
>>>>>>> 
>>>>>>> Plus spark 2.4 introduced a lot of new operator for nested data. That's a win! 
>>>>>>> 
>>>>>>> 
>>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com> wrote:
>>>>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>>>> 
>>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com> wrote:
>>>>>>>>> Alessandro,
>>>>>>>>> 
>>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean "equality  full outer join" .
>>>>>>>>> 
>>>>>>>>> Two issues I see with equity outer join is:
>>>>>>>>> (1) equity outer join will give n * m rows for each key (n and m being the corresponding number of rows in df1 and df2 for each key)
>>>>>>>>> (2) User needs to do some extra processing to transform n * m back to the desired shape (two sub dataframes with n and m rows) 
>>>>>>>>> 
>>>>>>>>> I think full outer join is an inefficient way to implement cogroup. If the end goal is to have two separate dataframes for each key, why joining them first and then unjoin them?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <al...@gmail.com> wrote:
>>>>>>>>>> Hello,
>>>>>>>>>> I fail to see how an equi-join on the key columns is different than the cogroup you propose.
>>>>>>>>>> 
>>>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>> 
>>>>>>>>>> Now you apply an udf on each iterable, one per key value (obtained with cogroup).
>>>>>>>>>> 
>>>>>>>>>> You can achieve the same by: 
>>>>>>>>>> 1) join df1 and df2 on the key you want, 
>>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are not familiar with them https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), that will process each group "in isolation".
>>>>>>>>>> 
>>>>>>>>>> HTH,
>>>>>>>>>> Alessandro
>>>>>>>>>> 
>>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com> wrote:
>>>>>>>>>>> Hi,
>>>>>>>>>>> 
>>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and it has been very helpful in integrating Spark with our existing pandas-heavy libraries.
>>>>>>>>>>> 
>>>>>>>>>>> Recently, we have found more and more cases where groupby().apply() is not sufficient - In some cases, we want to group two dataframes by the same key, and apply a function which takes two pd.DataFrame (also returns a pd.DataFrame) for each key. This feels very much like the "cogroup" operation in the RDD API.
>>>>>>>>>>> 
>>>>>>>>>>> It would be great to be able to do sth like this: (not actual API, just to explain the use case):
>>>>>>>>>>> 
>>>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>>       # pdf1 and pdf2 are the subset of the original dataframes that is associated with a particular key
>>>>>>>>>>>       result = ... # some code that uses pdf1 and pdf2
>>>>>>>>>>>       return result   
>>>>>>>>>>> 
>>>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>>> 
>>>>>>>>>>> I have searched around the problem and some people have suggested to join the tables first. However, it's often not the same pattern and hard to get it to work by using joins.
>>>>>>>>>>> 
>>>>>>>>>>> I wonder what are people's thought on this? 
>>>>>>>>>>> 
>>>>>>>>>>> Li

Re: Thoughts on dataframe cogroup?

Posted by Chris Martin <ch...@cmartinit.co.uk>.
Thanks Bryan and Li, that is much appreciated.  Hopefully should have the
SPIP ready in the next couple of days.

thanks,

Chris




On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cu...@gmail.com> wrote:

> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be too
> difficult to extend the currently functionality to transfer multiple
> DataFrames.  For the SPIP, I would keep it more high-level and I don't
> think it's necessary to include details of the Python worker, we can hash
> that out after the SPIP is approved.
>
> Bryan
>
> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ic...@gmail.com> wrote:
>
>> Thanks Chris, look forward to it.
>>
>> I think sending multiple dataframes to the python worker requires some
>> changes but shouldn't be too difficult. We can probably sth like:
>>
>>
>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>
>> In:
>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>
>> And have ArrowPythonRunner take multiple input iterator/schema.
>>
>> Li
>>
>>
>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>
>>> Hi,
>>>
>>> Just to say, I really do think this is useful and am currently working
>>> on a SPIP to formally propose this. One concern I do have, however, is that
>>> the current arrow serialization code is tied to passing through a single
>>> dataframe as the udf parameter and so any modification to allow multiple
>>> dataframes may not be straightforward.  If anyone has any ideas as to how
>>> this might be achieved in an elegant manner I’d be happy to hear them!
>>>
>>> Thanks,
>>>
>>> Chris
>>>
>>> On 26 Feb 2019, at 14:55, Li Jin <ic...@gmail.com> wrote:
>>>
>>> Thank you both for the reply. Chris and I have very similar use cases
>>> for cogroup.
>>>
>>> One of the goals for groupby apply + pandas UDF was to avoid things like
>>> collect list and reshaping data between Spark and Pandas. Cogroup feels
>>> very similar and can be an extension to the groupby apply + pandas UDF
>>> functionality.
>>>
>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>
>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>
>>>> Just to add to this I’ve also implemented my own cogroup previously and
>>>> would welcome a cogroup for datafame.
>>>>
>>>> My specific use case was that I had a large amount of time series data.
>>>> Spark has very limited support for time series (specifically as-of joins),
>>>> but pandas has good support.
>>>>
>>>> My solution was to take my two dataframes and perform a group by and
>>>> collect list on each. The resulting arrays could be passed into a udf where
>>>> they could be marshaled into a couple of pandas dataframes and processed
>>>> using pandas excellent time series functionality.
>>>>
>>>> If cogroup was available natively on dataframes this would have been a
>>>> bit nicer. The ideal would have been some pandas udf version of cogroup
>>>> that gave me a pandas dataframe for each spark dataframe in the cogroup!
>>>>
>>>> Chris
>>>>
>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <jo...@gmail.com>
>>>> wrote:
>>>>
>>>> For info, in our team have defined our own cogroup on dataframe in the
>>>> past on different projects using different methods (rdd[row] based or union
>>>> all collect list based).
>>>>
>>>> I might be biased, but find the approach very useful in project to
>>>> simplify and speed up transformations, and remove a lot of intermediate
>>>> stages (distinct + join => just cogroup).
>>>>
>>>> Plus spark 2.4 introduced a lot of new operator for nested data. That's
>>>> a win!
>>>>
>>>>
>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com> wrote:
>>>>
>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>
>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com> wrote:
>>>>>
>>>>>> Alessandro,
>>>>>>
>>>>>> Thanks for the reply. I assume by "equi-join", you mean "equality
>>>>>> full outer join" .
>>>>>>
>>>>>> Two issues I see with equity outer join is:
>>>>>> (1) equity outer join will give n * m rows for each key (n and m
>>>>>> being the corresponding number of rows in df1 and df2 for each key)
>>>>>> (2) User needs to do some extra processing to transform n * m back to
>>>>>> the desired shape (two sub dataframes with n and m rows)
>>>>>>
>>>>>> I think full outer join is an inefficient way to implement cogroup.
>>>>>> If the end goal is to have two separate dataframes for each key, why
>>>>>> joining them first and then unjoin them?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>> alessandro.solimando@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>> I fail to see how an equi-join on the key columns is different than
>>>>>>> the cogroup you propose.
>>>>>>>
>>>>>>> I think the accepted answer can shed some light:
>>>>>>>
>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>
>>>>>>> Now you apply an udf on each iterable, one per key value (obtained
>>>>>>> with cogroup).
>>>>>>>
>>>>>>> You can achieve the same by:
>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>> 2) apply "groupby" on such key
>>>>>>> 3) finally apply a udaf (you can have a look here if you are not
>>>>>>> familiar with them
>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>> that will process each group "in isolation".
>>>>>>>
>>>>>>> HTH,
>>>>>>> Alessandro
>>>>>>>
>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and it
>>>>>>>> has been very helpful in integrating Spark with our existing pandas-heavy
>>>>>>>> libraries.
>>>>>>>>
>>>>>>>> Recently, we have found more and more cases where groupby().apply()
>>>>>>>> is not sufficient - In some cases, we want to group two dataframes by the
>>>>>>>> same key, and apply a function which takes two pd.DataFrame (also returns a
>>>>>>>> pd.DataFrame) for each key. This feels very much like the "cogroup"
>>>>>>>> operation in the RDD API.
>>>>>>>>
>>>>>>>> It would be great to be able to do sth like this: (not actual API,
>>>>>>>> just to explain the use case):
>>>>>>>>
>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>      # pdf1 and pdf2 are the subset of the original dataframes that
>>>>>>>> is associated with a particular key
>>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>>      return result
>>>>>>>>
>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>
>>>>>>>> I have searched around the problem and some people have suggested
>>>>>>>> to join the tables first. However, it's often not the same pattern and hard
>>>>>>>> to get it to work by using joins.
>>>>>>>>
>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>
>>>>>>>> Li
>>>>>>>>
>>>>>>>>

Re: Thoughts on dataframe cogroup?

Posted by Bryan Cutler <cu...@gmail.com>.
Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be too
difficult to extend the currently functionality to transfer multiple
DataFrames.  For the SPIP, I would keep it more high-level and I don't
think it's necessary to include details of the Python worker, we can hash
that out after the SPIP is approved.

Bryan

On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ic...@gmail.com> wrote:

> Thanks Chris, look forward to it.
>
> I think sending multiple dataframes to the python worker requires some
> changes but shouldn't be too difficult. We can probably sth like:
>
>
> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>
> In:
> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>
> And have ArrowPythonRunner take multiple input iterator/schema.
>
> Li
>
>
> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>
>> Hi,
>>
>> Just to say, I really do think this is useful and am currently working on
>> a SPIP to formally propose this. One concern I do have, however, is that
>> the current arrow serialization code is tied to passing through a single
>> dataframe as the udf parameter and so any modification to allow multiple
>> dataframes may not be straightforward.  If anyone has any ideas as to how
>> this might be achieved in an elegant manner I’d be happy to hear them!
>>
>> Thanks,
>>
>> Chris
>>
>> On 26 Feb 2019, at 14:55, Li Jin <ic...@gmail.com> wrote:
>>
>> Thank you both for the reply. Chris and I have very similar use cases for
>> cogroup.
>>
>> One of the goals for groupby apply + pandas UDF was to avoid things like
>> collect list and reshaping data between Spark and Pandas. Cogroup feels
>> very similar and can be an extension to the groupby apply + pandas UDF
>> functionality.
>>
>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>
>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>
>>> Just to add to this I’ve also implemented my own cogroup previously and
>>> would welcome a cogroup for datafame.
>>>
>>> My specific use case was that I had a large amount of time series data.
>>> Spark has very limited support for time series (specifically as-of joins),
>>> but pandas has good support.
>>>
>>> My solution was to take my two dataframes and perform a group by and
>>> collect list on each. The resulting arrays could be passed into a udf where
>>> they could be marshaled into a couple of pandas dataframes and processed
>>> using pandas excellent time series functionality.
>>>
>>> If cogroup was available natively on dataframes this would have been a
>>> bit nicer. The ideal would have been some pandas udf version of cogroup
>>> that gave me a pandas dataframe for each spark dataframe in the cogroup!
>>>
>>> Chris
>>>
>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <jo...@gmail.com>
>>> wrote:
>>>
>>> For info, in our team have defined our own cogroup on dataframe in the
>>> past on different projects using different methods (rdd[row] based or union
>>> all collect list based).
>>>
>>> I might be biased, but find the approach very useful in project to
>>> simplify and speed up transformations, and remove a lot of intermediate
>>> stages (distinct + join => just cogroup).
>>>
>>> Plus spark 2.4 introduced a lot of new operator for nested data. That's
>>> a win!
>>>
>>>
>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com> wrote:
>>>
>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>
>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com> wrote:
>>>>
>>>>> Alessandro,
>>>>>
>>>>> Thanks for the reply. I assume by "equi-join", you mean "equality
>>>>> full outer join" .
>>>>>
>>>>> Two issues I see with equity outer join is:
>>>>> (1) equity outer join will give n * m rows for each key (n and m being
>>>>> the corresponding number of rows in df1 and df2 for each key)
>>>>> (2) User needs to do some extra processing to transform n * m back to
>>>>> the desired shape (two sub dataframes with n and m rows)
>>>>>
>>>>> I think full outer join is an inefficient way to implement cogroup. If
>>>>> the end goal is to have two separate dataframes for each key, why joining
>>>>> them first and then unjoin them?
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>> alessandro.solimando@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>> I fail to see how an equi-join on the key columns is different than
>>>>>> the cogroup you propose.
>>>>>>
>>>>>> I think the accepted answer can shed some light:
>>>>>>
>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>
>>>>>> Now you apply an udf on each iterable, one per key value (obtained
>>>>>> with cogroup).
>>>>>>
>>>>>> You can achieve the same by:
>>>>>> 1) join df1 and df2 on the key you want,
>>>>>> 2) apply "groupby" on such key
>>>>>> 3) finally apply a udaf (you can have a look here if you are not
>>>>>> familiar with them
>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>> that will process each group "in isolation".
>>>>>>
>>>>>> HTH,
>>>>>> Alessandro
>>>>>>
>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and it
>>>>>>> has been very helpful in integrating Spark with our existing pandas-heavy
>>>>>>> libraries.
>>>>>>>
>>>>>>> Recently, we have found more and more cases where groupby().apply()
>>>>>>> is not sufficient - In some cases, we want to group two dataframes by the
>>>>>>> same key, and apply a function which takes two pd.DataFrame (also returns a
>>>>>>> pd.DataFrame) for each key. This feels very much like the "cogroup"
>>>>>>> operation in the RDD API.
>>>>>>>
>>>>>>> It would be great to be able to do sth like this: (not actual API,
>>>>>>> just to explain the use case):
>>>>>>>
>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>      # pdf1 and pdf2 are the subset of the original dataframes that
>>>>>>> is associated with a particular key
>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>      return result
>>>>>>>
>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>
>>>>>>> I have searched around the problem and some people have suggested to
>>>>>>> join the tables first. However, it's often not the same pattern and hard to
>>>>>>> get it to work by using joins.
>>>>>>>
>>>>>>> I wonder what are people's thought on this?
>>>>>>>
>>>>>>> Li
>>>>>>>
>>>>>>>

Re: Thoughts on dataframe cogroup?

Posted by Li Jin <ic...@gmail.com>.
Thanks Chris, look forward to it.

I think sending multiple dataframes to the python worker requires some
changes but shouldn't be too difficult. We can probably sth like:

[numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]

In:
https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70

And have ArrowPythonRunner take multiple input iterator/schema.

Li


On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:

> Hi,
>
> Just to say, I really do think this is useful and am currently working on
> a SPIP to formally propose this. One concern I do have, however, is that
> the current arrow serialization code is tied to passing through a single
> dataframe as the udf parameter and so any modification to allow multiple
> dataframes may not be straightforward.  If anyone has any ideas as to how
> this might be achieved in an elegant manner I’d be happy to hear them!
>
> Thanks,
>
> Chris
>
> On 26 Feb 2019, at 14:55, Li Jin <ic...@gmail.com> wrote:
>
> Thank you both for the reply. Chris and I have very similar use cases for
> cogroup.
>
> One of the goals for groupby apply + pandas UDF was to avoid things like
> collect list and reshaping data between Spark and Pandas. Cogroup feels
> very similar and can be an extension to the groupby apply + pandas UDF
> functionality.
>
> I wonder if any PMC/committers have any thoughts/opinions on this?
>
> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>
>> Just to add to this I’ve also implemented my own cogroup previously and
>> would welcome a cogroup for datafame.
>>
>> My specific use case was that I had a large amount of time series data.
>> Spark has very limited support for time series (specifically as-of joins),
>> but pandas has good support.
>>
>> My solution was to take my two dataframes and perform a group by and
>> collect list on each. The resulting arrays could be passed into a udf where
>> they could be marshaled into a couple of pandas dataframes and processed
>> using pandas excellent time series functionality.
>>
>> If cogroup was available natively on dataframes this would have been a
>> bit nicer. The ideal would have been some pandas udf version of cogroup
>> that gave me a pandas dataframe for each spark dataframe in the cogroup!
>>
>> Chris
>>
>> On 26 Feb 2019, at 00:38, Jonathan Winandy <jo...@gmail.com>
>> wrote:
>>
>> For info, in our team have defined our own cogroup on dataframe in the
>> past on different projects using different methods (rdd[row] based or union
>> all collect list based).
>>
>> I might be biased, but find the approach very useful in project to
>> simplify and speed up transformations, and remove a lot of intermediate
>> stages (distinct + join => just cogroup).
>>
>> Plus spark 2.4 introduced a lot of new operator for nested data. That's a
>> win!
>>
>>
>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com> wrote:
>>
>>> I am wondering do other people have opinion/use case on cogroup?
>>>
>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com> wrote:
>>>
>>>> Alessandro,
>>>>
>>>> Thanks for the reply. I assume by "equi-join", you mean "equality  full
>>>> outer join" .
>>>>
>>>> Two issues I see with equity outer join is:
>>>> (1) equity outer join will give n * m rows for each key (n and m being
>>>> the corresponding number of rows in df1 and df2 for each key)
>>>> (2) User needs to do some extra processing to transform n * m back to
>>>> the desired shape (two sub dataframes with n and m rows)
>>>>
>>>> I think full outer join is an inefficient way to implement cogroup. If
>>>> the end goal is to have two separate dataframes for each key, why joining
>>>> them first and then unjoin them?
>>>>
>>>>
>>>>
>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>> alessandro.solimando@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>> I fail to see how an equi-join on the key columns is different than
>>>>> the cogroup you propose.
>>>>>
>>>>> I think the accepted answer can shed some light:
>>>>>
>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>
>>>>> Now you apply an udf on each iterable, one per key value (obtained
>>>>> with cogroup).
>>>>>
>>>>> You can achieve the same by:
>>>>> 1) join df1 and df2 on the key you want,
>>>>> 2) apply "groupby" on such key
>>>>> 3) finally apply a udaf (you can have a look here if you are not
>>>>> familiar with them
>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>> that will process each group "in isolation".
>>>>>
>>>>> HTH,
>>>>> Alessandro
>>>>>
>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We have been using Pyspark's groupby().apply() quite a bit and it has
>>>>>> been very helpful in integrating Spark with our existing pandas-heavy
>>>>>> libraries.
>>>>>>
>>>>>> Recently, we have found more and more cases where groupby().apply()
>>>>>> is not sufficient - In some cases, we want to group two dataframes by the
>>>>>> same key, and apply a function which takes two pd.DataFrame (also returns a
>>>>>> pd.DataFrame) for each key. This feels very much like the "cogroup"
>>>>>> operation in the RDD API.
>>>>>>
>>>>>> It would be great to be able to do sth like this: (not actual API,
>>>>>> just to explain the use case):
>>>>>>
>>>>>> @pandas_udf(return_schema, ...)
>>>>>> def my_udf(pdf1, pdf2)
>>>>>>      # pdf1 and pdf2 are the subset of the original dataframes that
>>>>>> is associated with a particular key
>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>      return result
>>>>>>
>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>
>>>>>> I have searched around the problem and some people have suggested to
>>>>>> join the tables first. However, it's often not the same pattern and hard to
>>>>>> get it to work by using joins.
>>>>>>
>>>>>> I wonder what are people's thought on this?
>>>>>>
>>>>>> Li
>>>>>>
>>>>>>

Re: Thoughts on dataframe cogroup?

Posted by ch...@cmartinit.co.uk.
Hi,

Just to say, I really do think this is useful and am currently working on a SPIP to formally propose this. One concern I do have, however, is that the current arrow serialization code is tied to passing through a single dataframe as the udf parameter and so any modification to allow multiple dataframes may not be straightforward.  If anyone has any ideas as to how this might be achieved in an elegant manner I’d be happy to hear them!

Thanks,

Chris 

> On 26 Feb 2019, at 14:55, Li Jin <ic...@gmail.com> wrote:
> 
> Thank you both for the reply. Chris and I have very similar use cases for cogroup. 
> 
> One of the goals for groupby apply + pandas UDF was to avoid things like collect list and reshaping data between Spark and Pandas. Cogroup feels very similar and can be an extension to the groupby apply + pandas UDF functionality.
> 
> I wonder if any PMC/committers have any thoughts/opinions on this?
> 
>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>> Just to add to this I’ve also implemented my own cogroup previously and would welcome a cogroup for datafame.
>> 
>> My specific use case was that I had a large amount of time series data. Spark has very limited support for time series (specifically as-of joins), but pandas has good support.
>> 
>> My solution was to take my two dataframes and perform a group by and collect list on each. The resulting arrays could be passed into a udf where they could be marshaled into a couple of pandas dataframes and processed using pandas excellent time series functionality.
>> 
>> If cogroup was available natively on dataframes this would have been a bit nicer. The ideal would have been some pandas udf version of cogroup that gave me a pandas dataframe for each spark dataframe in the cogroup!
>> 
>> Chris 
>> 
>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <jo...@gmail.com> wrote:
>>> 
>>> For info, in our team have defined our own cogroup on dataframe in the past on different projects using different methods (rdd[row] based or union all collect list based). 
>>> 
>>> I might be biased, but find the approach very useful in project to simplify and speed up transformations, and remove a lot of intermediate stages (distinct + join => just cogroup). 
>>> 
>>> Plus spark 2.4 introduced a lot of new operator for nested data. That's a win! 
>>> 
>>> 
>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com> wrote:
>>>> I am wondering do other people have opinion/use case on cogroup?
>>>> 
>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com> wrote:
>>>>> Alessandro,
>>>>> 
>>>>> Thanks for the reply. I assume by "equi-join", you mean "equality  full outer join" .
>>>>> 
>>>>> Two issues I see with equity outer join is:
>>>>> (1) equity outer join will give n * m rows for each key (n and m being the corresponding number of rows in df1 and df2 for each key)
>>>>> (2) User needs to do some extra processing to transform n * m back to the desired shape (two sub dataframes with n and m rows) 
>>>>> 
>>>>> I think full outer join is an inefficient way to implement cogroup. If the end goal is to have two separate dataframes for each key, why joining them first and then unjoin them?
>>>>> 
>>>>> 
>>>>> 
>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <al...@gmail.com> wrote:
>>>>>> Hello,
>>>>>> I fail to see how an equi-join on the key columns is different than the cogroup you propose.
>>>>>> 
>>>>>> I think the accepted answer can shed some light:
>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>> 
>>>>>> Now you apply an udf on each iterable, one per key value (obtained with cogroup).
>>>>>> 
>>>>>> You can achieve the same by: 
>>>>>> 1) join df1 and df2 on the key you want, 
>>>>>> 2) apply "groupby" on such key
>>>>>> 3) finally apply a udaf (you can have a look here if you are not familiar with them https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), that will process each group "in isolation".
>>>>>> 
>>>>>> HTH,
>>>>>> Alessandro
>>>>>> 
>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com> wrote:
>>>>>>> Hi,
>>>>>>> 
>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and it has been very helpful in integrating Spark with our existing pandas-heavy libraries.
>>>>>>> 
>>>>>>> Recently, we have found more and more cases where groupby().apply() is not sufficient - In some cases, we want to group two dataframes by the same key, and apply a function which takes two pd.DataFrame (also returns a pd.DataFrame) for each key. This feels very much like the "cogroup" operation in the RDD API.
>>>>>>> 
>>>>>>> It would be great to be able to do sth like this: (not actual API, just to explain the use case):
>>>>>>> 
>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>       # pdf1 and pdf2 are the subset of the original dataframes that is associated with a particular key
>>>>>>>       result = ... # some code that uses pdf1 and pdf2
>>>>>>>       return result   
>>>>>>> 
>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>> 
>>>>>>> I have searched around the problem and some people have suggested to join the tables first. However, it's often not the same pattern and hard to get it to work by using joins.
>>>>>>> 
>>>>>>> I wonder what are people's thought on this? 
>>>>>>> 
>>>>>>> Li
>>>>>>> 

Re: Thoughts on dataframe cogroup?

Posted by Li Jin <ic...@gmail.com>.
Thank you both for the reply. Chris and I have very similar use cases for
cogroup.

One of the goals for groupby apply + pandas UDF was to avoid things like
collect list and reshaping data between Spark and Pandas. Cogroup feels
very similar and can be an extension to the groupby apply + pandas UDF
functionality.

I wonder if any PMC/committers have any thoughts/opinions on this?

On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:

> Just to add to this I’ve also implemented my own cogroup previously and
> would welcome a cogroup for datafame.
>
> My specific use case was that I had a large amount of time series data.
> Spark has very limited support for time series (specifically as-of joins),
> but pandas has good support.
>
> My solution was to take my two dataframes and perform a group by and
> collect list on each. The resulting arrays could be passed into a udf where
> they could be marshaled into a couple of pandas dataframes and processed
> using pandas excellent time series functionality.
>
> If cogroup was available natively on dataframes this would have been a bit
> nicer. The ideal would have been some pandas udf version of cogroup that
> gave me a pandas dataframe for each spark dataframe in the cogroup!
>
> Chris
>
> On 26 Feb 2019, at 00:38, Jonathan Winandy <jo...@gmail.com>
> wrote:
>
> For info, in our team have defined our own cogroup on dataframe in the
> past on different projects using different methods (rdd[row] based or union
> all collect list based).
>
> I might be biased, but find the approach very useful in project to
> simplify and speed up transformations, and remove a lot of intermediate
> stages (distinct + join => just cogroup).
>
> Plus spark 2.4 introduced a lot of new operator for nested data. That's a
> win!
>
>
> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com> wrote:
>
>> I am wondering do other people have opinion/use case on cogroup?
>>
>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com> wrote:
>>
>>> Alessandro,
>>>
>>> Thanks for the reply. I assume by "equi-join", you mean "equality  full
>>> outer join" .
>>>
>>> Two issues I see with equity outer join is:
>>> (1) equity outer join will give n * m rows for each key (n and m being
>>> the corresponding number of rows in df1 and df2 for each key)
>>> (2) User needs to do some extra processing to transform n * m back to
>>> the desired shape (two sub dataframes with n and m rows)
>>>
>>> I think full outer join is an inefficient way to implement cogroup. If
>>> the end goal is to have two separate dataframes for each key, why joining
>>> them first and then unjoin them?
>>>
>>>
>>>
>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>> alessandro.solimando@gmail.com> wrote:
>>>
>>>> Hello,
>>>> I fail to see how an equi-join on the key columns is different than the
>>>> cogroup you propose.
>>>>
>>>> I think the accepted answer can shed some light:
>>>>
>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>
>>>> Now you apply an udf on each iterable, one per key value (obtained with
>>>> cogroup).
>>>>
>>>> You can achieve the same by:
>>>> 1) join df1 and df2 on the key you want,
>>>> 2) apply "groupby" on such key
>>>> 3) finally apply a udaf (you can have a look here if you are not
>>>> familiar with them
>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>> that will process each group "in isolation".
>>>>
>>>> HTH,
>>>> Alessandro
>>>>
>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We have been using Pyspark's groupby().apply() quite a bit and it has
>>>>> been very helpful in integrating Spark with our existing pandas-heavy
>>>>> libraries.
>>>>>
>>>>> Recently, we have found more and more cases where groupby().apply() is
>>>>> not sufficient - In some cases, we want to group two dataframes by the same
>>>>> key, and apply a function which takes two pd.DataFrame (also returns a
>>>>> pd.DataFrame) for each key. This feels very much like the "cogroup"
>>>>> operation in the RDD API.
>>>>>
>>>>> It would be great to be able to do sth like this: (not actual API,
>>>>> just to explain the use case):
>>>>>
>>>>> @pandas_udf(return_schema, ...)
>>>>> def my_udf(pdf1, pdf2)
>>>>>      # pdf1 and pdf2 are the subset of the original dataframes that is
>>>>> associated with a particular key
>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>      return result
>>>>>
>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>
>>>>> I have searched around the problem and some people have suggested to
>>>>> join the tables first. However, it's often not the same pattern and hard to
>>>>> get it to work by using joins.
>>>>>
>>>>> I wonder what are people's thought on this?
>>>>>
>>>>> Li
>>>>>
>>>>>

Re: Thoughts on dataframe cogroup?

Posted by ch...@cmartinit.co.uk.
Just to add to this I’ve also implemented my own cogroup previously and would welcome a cogroup for datafame.

My specific use case was that I had a large amount of time series data. Spark has very limited support for time series (specifically as-of joins), but pandas has good support.

My solution was to take my two dataframes and perform a group by and collect list on each. The resulting arrays could be passed into a udf where they could be marshaled into a couple of pandas dataframes and processed using pandas excellent time series functionality.

If cogroup was available natively on dataframes this would have been a bit nicer. The ideal would have been some pandas udf version of cogroup that gave me a pandas dataframe for each spark dataframe in the cogroup!

Chris 

> On 26 Feb 2019, at 00:38, Jonathan Winandy <jo...@gmail.com> wrote:
> 
> For info, in our team have defined our own cogroup on dataframe in the past on different projects using different methods (rdd[row] based or union all collect list based). 
> 
> I might be biased, but find the approach very useful in project to simplify and speed up transformations, and remove a lot of intermediate stages (distinct + join => just cogroup). 
> 
> Plus spark 2.4 introduced a lot of new operator for nested data. That's a win! 
> 
> 
>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com> wrote:
>> I am wondering do other people have opinion/use case on cogroup?
>> 
>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com> wrote:
>>> Alessandro,
>>> 
>>> Thanks for the reply. I assume by "equi-join", you mean "equality  full outer join" .
>>> 
>>> Two issues I see with equity outer join is:
>>> (1) equity outer join will give n * m rows for each key (n and m being the corresponding number of rows in df1 and df2 for each key)
>>> (2) User needs to do some extra processing to transform n * m back to the desired shape (two sub dataframes with n and m rows) 
>>> 
>>> I think full outer join is an inefficient way to implement cogroup. If the end goal is to have two separate dataframes for each key, why joining them first and then unjoin them?
>>> 
>>> 
>>> 
>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <al...@gmail.com> wrote:
>>>> Hello,
>>>> I fail to see how an equi-join on the key columns is different than the cogroup you propose.
>>>> 
>>>> I think the accepted answer can shed some light:
>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>> 
>>>> Now you apply an udf on each iterable, one per key value (obtained with cogroup).
>>>> 
>>>> You can achieve the same by: 
>>>> 1) join df1 and df2 on the key you want, 
>>>> 2) apply "groupby" on such key
>>>> 3) finally apply a udaf (you can have a look here if you are not familiar with them https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), that will process each group "in isolation".
>>>> 
>>>> HTH,
>>>> Alessandro
>>>> 
>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com> wrote:
>>>>> Hi,
>>>>> 
>>>>> We have been using Pyspark's groupby().apply() quite a bit and it has been very helpful in integrating Spark with our existing pandas-heavy libraries.
>>>>> 
>>>>> Recently, we have found more and more cases where groupby().apply() is not sufficient - In some cases, we want to group two dataframes by the same key, and apply a function which takes two pd.DataFrame (also returns a pd.DataFrame) for each key. This feels very much like the "cogroup" operation in the RDD API.
>>>>> 
>>>>> It would be great to be able to do sth like this: (not actual API, just to explain the use case):
>>>>> 
>>>>> @pandas_udf(return_schema, ...)
>>>>> def my_udf(pdf1, pdf2)
>>>>>       # pdf1 and pdf2 are the subset of the original dataframes that is associated with a particular key
>>>>>       result = ... # some code that uses pdf1 and pdf2
>>>>>       return result   
>>>>> 
>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>> 
>>>>> I have searched around the problem and some people have suggested to join the tables first. However, it's often not the same pattern and hard to get it to work by using joins.
>>>>> 
>>>>> I wonder what are people's thought on this? 
>>>>> 
>>>>> Li
>>>>> 

Re: Thoughts on dataframe cogroup?

Posted by Jonathan Winandy <jo...@gmail.com>.
For info, in our team have defined our own cogroup on dataframe in the past
on different projects using different methods (rdd[row] based or union all
collect list based).

I might be biased, but find the approach very useful in project to simplify
and speed up transformations, and remove a lot of intermediate stages
(distinct + join => just cogroup).

Plus spark 2.4 introduced a lot of new operator for nested data. That's a
win!


On Thu, 21 Feb 2019, 17:38 Li Jin, <ic...@gmail.com> wrote:

> I am wondering do other people have opinion/use case on cogroup?
>
> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com> wrote:
>
>> Alessandro,
>>
>> Thanks for the reply. I assume by "equi-join", you mean "equality  full
>> outer join" .
>>
>> Two issues I see with equity outer join is:
>> (1) equity outer join will give n * m rows for each key (n and m being
>> the corresponding number of rows in df1 and df2 for each key)
>> (2) User needs to do some extra processing to transform n * m back to the
>> desired shape (two sub dataframes with n and m rows)
>>
>> I think full outer join is an inefficient way to implement cogroup. If
>> the end goal is to have two separate dataframes for each key, why joining
>> them first and then unjoin them?
>>
>>
>>
>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>> alessandro.solimando@gmail.com> wrote:
>>
>>> Hello,
>>> I fail to see how an equi-join on the key columns is different than the
>>> cogroup you propose.
>>>
>>> I think the accepted answer can shed some light:
>>>
>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>
>>> Now you apply an udf on each iterable, one per key value (obtained with
>>> cogroup).
>>>
>>> You can achieve the same by:
>>> 1) join df1 and df2 on the key you want,
>>> 2) apply "groupby" on such key
>>> 3) finally apply a udaf (you can have a look here if you are not
>>> familiar with them
>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>> that will process each group "in isolation".
>>>
>>> HTH,
>>> Alessandro
>>>
>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We have been using Pyspark's groupby().apply() quite a bit and it has
>>>> been very helpful in integrating Spark with our existing pandas-heavy
>>>> libraries.
>>>>
>>>> Recently, we have found more and more cases where groupby().apply() is
>>>> not sufficient - In some cases, we want to group two dataframes by the same
>>>> key, and apply a function which takes two pd.DataFrame (also returns a
>>>> pd.DataFrame) for each key. This feels very much like the "cogroup"
>>>> operation in the RDD API.
>>>>
>>>> It would be great to be able to do sth like this: (not actual API, just
>>>> to explain the use case):
>>>>
>>>> @pandas_udf(return_schema, ...)
>>>> def my_udf(pdf1, pdf2)
>>>>      # pdf1 and pdf2 are the subset of the original dataframes that is
>>>> associated with a particular key
>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>      return result
>>>>
>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>
>>>> I have searched around the problem and some people have suggested to
>>>> join the tables first. However, it's often not the same pattern and hard to
>>>> get it to work by using joins.
>>>>
>>>> I wonder what are people's thought on this?
>>>>
>>>> Li
>>>>
>>>>

Re: Thoughts on dataframe cogroup?

Posted by Li Jin <ic...@gmail.com>.
I am wondering do other people have opinion/use case on cogroup?

On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ic...@gmail.com> wrote:

> Alessandro,
>
> Thanks for the reply. I assume by "equi-join", you mean "equality  full
> outer join" .
>
> Two issues I see with equity outer join is:
> (1) equity outer join will give n * m rows for each key (n and m being the
> corresponding number of rows in df1 and df2 for each key)
> (2) User needs to do some extra processing to transform n * m back to the
> desired shape (two sub dataframes with n and m rows)
>
> I think full outer join is an inefficient way to implement cogroup. If the
> end goal is to have two separate dataframes for each key, why joining them
> first and then unjoin them?
>
>
>
> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
> alessandro.solimando@gmail.com> wrote:
>
>> Hello,
>> I fail to see how an equi-join on the key columns is different than the
>> cogroup you propose.
>>
>> I think the accepted answer can shed some light:
>>
>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>
>> Now you apply an udf on each iterable, one per key value (obtained with
>> cogroup).
>>
>> You can achieve the same by:
>> 1) join df1 and df2 on the key you want,
>> 2) apply "groupby" on such key
>> 3) finally apply a udaf (you can have a look here if you are not familiar
>> with them
>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>> that will process each group "in isolation".
>>
>> HTH,
>> Alessandro
>>
>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> We have been using Pyspark's groupby().apply() quite a bit and it has
>>> been very helpful in integrating Spark with our existing pandas-heavy
>>> libraries.
>>>
>>> Recently, we have found more and more cases where groupby().apply() is
>>> not sufficient - In some cases, we want to group two dataframes by the same
>>> key, and apply a function which takes two pd.DataFrame (also returns a
>>> pd.DataFrame) for each key. This feels very much like the "cogroup"
>>> operation in the RDD API.
>>>
>>> It would be great to be able to do sth like this: (not actual API, just
>>> to explain the use case):
>>>
>>> @pandas_udf(return_schema, ...)
>>> def my_udf(pdf1, pdf2)
>>>      # pdf1 and pdf2 are the subset of the original dataframes that is
>>> associated with a particular key
>>>      result = ... # some code that uses pdf1 and pdf2
>>>      return result
>>>
>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>
>>> I have searched around the problem and some people have suggested to
>>> join the tables first. However, it's often not the same pattern and hard to
>>> get it to work by using joins.
>>>
>>> I wonder what are people's thought on this?
>>>
>>> Li
>>>
>>>

Re: Thoughts on dataframe cogroup?

Posted by Li Jin <ic...@gmail.com>.
Alessandro,

Thanks for the reply. I assume by "equi-join", you mean "equality  full
outer join" .

Two issues I see with equity outer join is:
(1) equity outer join will give n * m rows for each key (n and m being the
corresponding number of rows in df1 and df2 for each key)
(2) User needs to do some extra processing to transform n * m back to the
desired shape (two sub dataframes with n and m rows)

I think full outer join is an inefficient way to implement cogroup. If the
end goal is to have two separate dataframes for each key, why joining them
first and then unjoin them?



On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
alessandro.solimando@gmail.com> wrote:

> Hello,
> I fail to see how an equi-join on the key columns is different than the
> cogroup you propose.
>
> I think the accepted answer can shed some light:
>
> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>
> Now you apply an udf on each iterable, one per key value (obtained with
> cogroup).
>
> You can achieve the same by:
> 1) join df1 and df2 on the key you want,
> 2) apply "groupby" on such key
> 3) finally apply a udaf (you can have a look here if you are not familiar
> with them
> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), that
> will process each group "in isolation".
>
> HTH,
> Alessandro
>
> On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com> wrote:
>
>> Hi,
>>
>> We have been using Pyspark's groupby().apply() quite a bit and it has
>> been very helpful in integrating Spark with our existing pandas-heavy
>> libraries.
>>
>> Recently, we have found more and more cases where groupby().apply() is
>> not sufficient - In some cases, we want to group two dataframes by the same
>> key, and apply a function which takes two pd.DataFrame (also returns a
>> pd.DataFrame) for each key. This feels very much like the "cogroup"
>> operation in the RDD API.
>>
>> It would be great to be able to do sth like this: (not actual API, just
>> to explain the use case):
>>
>> @pandas_udf(return_schema, ...)
>> def my_udf(pdf1, pdf2)
>>      # pdf1 and pdf2 are the subset of the original dataframes that is
>> associated with a particular key
>>      result = ... # some code that uses pdf1 and pdf2
>>      return result
>>
>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>
>> I have searched around the problem and some people have suggested to join
>> the tables first. However, it's often not the same pattern and hard to get
>> it to work by using joins.
>>
>> I wonder what are people's thought on this?
>>
>> Li
>>
>>

Re: Thoughts on dataframe cogroup?

Posted by Alessandro Solimando <al...@gmail.com>.
Hello,
I fail to see how an equi-join on the key columns is different than the
cogroup you propose.

I think the accepted answer can shed some light:
https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark

Now you apply an udf on each iterable, one per key value (obtained with
cogroup).

You can achieve the same by:
1) join df1 and df2 on the key you want,
2) apply "groupby" on such key
3) finally apply a udaf (you can have a look here if you are not familiar
with them https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
that will process each group "in isolation".

HTH,
Alessandro

On Tue, 19 Feb 2019 at 23:30, Li Jin <ic...@gmail.com> wrote:

> Hi,
>
> We have been using Pyspark's groupby().apply() quite a bit and it has been
> very helpful in integrating Spark with our existing pandas-heavy libraries.
>
> Recently, we have found more and more cases where groupby().apply() is not
> sufficient - In some cases, we want to group two dataframes by the same
> key, and apply a function which takes two pd.DataFrame (also returns a
> pd.DataFrame) for each key. This feels very much like the "cogroup"
> operation in the RDD API.
>
> It would be great to be able to do sth like this: (not actual API, just to
> explain the use case):
>
> @pandas_udf(return_schema, ...)
> def my_udf(pdf1, pdf2)
>      # pdf1 and pdf2 are the subset of the original dataframes that is
> associated with a particular key
>      result = ... # some code that uses pdf1 and pdf2
>      return result
>
> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>
> I have searched around the problem and some people have suggested to join
> the tables first. However, it's often not the same pattern and hard to get
> it to work by using joins.
>
> I wonder what are people's thought on this?
>
> Li
>
>