You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Rogelio Miguel Hernandez Sandoval (Jira)" <ji...@apache.org> on 2021/10/20 22:32:00 UTC

[jira] [Commented] (BEAM-12560) Implement idxmin and idxmax for DataFrame, Series, and GroupBy

    [ https://issues.apache.org/jira/browse/BEAM-12560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432116#comment-17432116 ] 

Rogelio Miguel Hernandez Sandoval commented on BEAM-12560:
----------------------------------------------------------

Hi [~bhulette], I'm working on this task and have some questions I hope you can shed some light on.

 

I'm trying to declare {{idxmin}} inside {{DeferredSeries}} like below. I know this is naive but I tried it as a starting point to see how this works.
{code:python}
@frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
  def idxmin(self, **kwargs):
    with expressions.allow_non_parallel_operations(True):
        return frame_base.DeferredFrame.wrap(
            expressions.ComputedExpression(
            'idxmin',
            lambda s: s.idxmin(**kwargs), [self._expr],
            requires_partition_by=partitionings.Singleton(),
            preserves_partition_by=partitionings.Singleton())
        ){code}
 

It tries to execute the {{idxmin}} function over an empty Series which causes an error in this [line|https://github.com/roger-mike/beam/blob/ffde2a60205d920749787a67d64a0f2af199ba35/sdks/python/apache_beam/dataframe/expressions.py#L359] of *expressions.py*:
{code:python}
proxy = func(*(arg.proxy() for arg in args))
{code}
 

Then, I tried something like this. I understand this is the way to apply a proxy function to each partition and then apply a function to all of them.
{code:python}
@frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def idxmin(self, **kwargs):
    idx_min = expressions.ComputedExpression(
        'idx_min',
        # lambda s: s,
        lambda s: s.append(pd.Series([999999], index=['G'])),
        [self._expr],
        requires_partition_by=partitionings.Index(),
        preserves_partition_by=partitionings.Singleton(),
    )
    with expressions.allow_non_parallel_operations(True):
        return frame_base.DeferredFrame.wrap(
            expressions.ComputedExpression(
            'combine_idxmin',
            lambda s: s.idxmin(**kwargs), [idx_min],
            requires_partition_by=partitionings.Singleton(),
            preserves_partition_by=partitionings.Singleton())
    )
{code}
So far, I understand that the lambda used in the {{ComputedExpression}} generates its result.

 

I tried to use {{lambda s: s}} to return the value as it is, but it throws the same empty Series error. When I used lambda {{s: s.append(pd.Series([999999], index=['G']))}} (just as an experiment) it seems that the lambda returns a valid Series and because the appended value is ignored by {{idxmin}} it even makes all the tests pass.

 

My questions are:
 -Could you explain how these proxy functions work?
 -How can I use them to apply a function to each partition and then join their results?

 

Thanks for your help. [~benglez] and [~AlexRodriguez] can also find this useful.

> Implement idxmin and idxmax for DataFrame, Series, and GroupBy
> --------------------------------------------------------------
>
>                 Key: BEAM-12560
>                 URL: https://issues.apache.org/jira/browse/BEAM-12560
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-dataframe
>            Reporter: Brian Hulette
>            Assignee: Rogelio Miguel Hernandez Sandoval
>            Priority: P3
>
> Add an implementation of [idxmin|https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.idxmin.html] and [idxmax|https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.idxmax.html] for DeferredDataFrame, DeferredSeries, and DeferredGroupBy. It should be fully unit tested with some combination of pandas_doctests_test.py and frames_test.py.
> https://github.com/apache/beam/pull/14274 is an example of a typical PR that adds new operations. See https://lists.apache.org/thread.html/r8ffe96d756054610dfdb4e849ffc6a741e826d15ba7e5bdeee1b4266%40%3Cdev.beam.apache.org%3E for background on the DataFrame API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)