You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/19 21:56:49 UTC

[GitHub] [beam] robertwb opened a new pull request #13141: [BEAM-9547] Dataframe corrwith.

robertwb opened a new pull request #13141:
URL: https://github.com/apache/beam/pull/13141


   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #13141: [BEAM-9547] Dataframe corrwith.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #13141:
URL: https://github.com/apache/beam/pull/13141#issuecomment-714847860


   On-call this week, I probably wont be able to give this enough attention until next Tuesday


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #13141: [BEAM-9547] Dataframe corrwith.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #13141:
URL: https://github.com/apache/beam/pull/13141#issuecomment-712481260


   R: @TheNeuralBit 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb merged pull request #13141: [BEAM-9547] Dataframe corrwith.

Posted by GitBox <gi...@apache.org>.
robertwb merged pull request #13141:
URL: https://github.com/apache/beam/pull/13141


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #13141: [BEAM-9547] Dataframe corrwith.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #13141:
URL: https://github.com/apache/beam/pull/13141#discussion_r513666115



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -575,6 +575,8 @@ def __setitem__(self, key, value):
     else:
       raise NotImplementedError(key)
 
+  align = frame_base._elementwise_method('align')

Review comment:
       Good call. Done.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -771,6 +773,62 @@ def fill_matrix(*args):
               requires_partition_by=partitionings.Singleton(),
               proxy=proxy))
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def corrwith(self, other, axis, **kwargs):
+    if axis not in (0, 'index'):
+      raise NotImplementedError('corrwith(axis=%r)' % axis)
+    if not isinstance(other, frame_base.DeferredFrame):
+      other = frame_base.DeferredFrame.wrap(
+          expressions.ConstantExpression(other))
+
+    if isinstance(other, DeferredSeries):
+      proxy = self._expr.proxy().corrwith(other._expr.proxy())
+      self, other = self.align(other, axis=0, join='inner')
+      corrs = [self[col].corr(other, **kwargs) for col in proxy.index]
+      def fill_dataframe(*args):
+        result = proxy.copy(deep=True)
+        for col, value in zip(proxy.index, args):
+          result[col] = value
+        return result
+      with expressions.allow_non_parallel_operations(True):
+        return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+            'fill_dataframe',
+            fill_dataframe,
+            [corr._expr for corr in corrs],
+            requires_partition_by=partitionings.Singleton(),
+            proxy=proxy))
+
+    elif isinstance(other, DeferredDataFrame):
+      proxy = self._expr.proxy().corrwith(other._expr.proxy())
+      self, other = self.align(other, axis=0, join='inner')
+      valid_cols = list(
+          set(self.columns)
+          .intersection(other.columns)
+          .intersection(proxy.index))
+      corrs = [self[col].corr(other[col], **kwargs) for col in valid_cols]
+      def fill_dataframe(*args):
+        result = proxy.copy(deep=True)
+        for col, value in zip(valid_cols, args):
+          result[col] = value
+        return result
+      with expressions.allow_non_parallel_operations(True):
+        return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+            'fill_dataframe',
+            fill_dataframe,
+            [corr._expr for corr in corrs],
+            requires_partition_by=partitionings.Singleton(),
+            proxy=proxy))

Review comment:
       I've consolidated them now.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -771,6 +773,62 @@ def fill_matrix(*args):
               requires_partition_by=partitionings.Singleton(),
               proxy=proxy))
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def corrwith(self, other, axis, **kwargs):
+    if axis not in (0, 'index'):
+      raise NotImplementedError('corrwith(axis=%r)' % axis)
+    if not isinstance(other, frame_base.DeferredFrame):
+      other = frame_base.DeferredFrame.wrap(
+          expressions.ConstantExpression(other))
+
+    if isinstance(other, DeferredSeries):
+      proxy = self._expr.proxy().corrwith(other._expr.proxy())
+      self, other = self.align(other, axis=0, join='inner')
+      corrs = [self[col].corr(other, **kwargs) for col in proxy.index]

Review comment:
       Resolved. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #13141: [BEAM-9547] Dataframe corrwith.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #13141:
URL: https://github.com/apache/beam/pull/13141#issuecomment-718200546


   Run PythonLint PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13141: [BEAM-9547] Dataframe corrwith.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13141:
URL: https://github.com/apache/beam/pull/13141#discussion_r512962676



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -575,6 +575,8 @@ def __setitem__(self, key, value):
     else:
       raise NotImplementedError(key)
 
+  align = frame_base._elementwise_method('align')

Review comment:
       Is this safe to implement with `elementwise_method`? (need to justify this to myself)
   
   I think it will work for most cases.. the resulting expression will require `partitionings.Index` since we do that if any arg is a DeferredBase, and `other` should always be one. That should handle axis='index'. Also all of the join modes should work as intended within Index partitions.
   
   I think actually for `axis='columns'` Index partitioning is _too_ restrictive. We could do that without any partitioning. Is that right?
   
   Looking at the remaining args:
   - `copy`: I think copy=False won't work since we can't predict if the operation is inplace or not, it depends on the data.
   - `fill_value`: Works trivially
   - `method`: non-default options are order-sensitive
   - `limit`: I don't think we can support this correctly right now without Singleton partitioning
   - `level`, `fill_axis`, `broadcast_axis`: I'm not actually sure what these are doing.
   
   Can we reject the options that won't work?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -771,6 +773,62 @@ def fill_matrix(*args):
               requires_partition_by=partitionings.Singleton(),
               proxy=proxy))
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def corrwith(self, other, axis, **kwargs):
+    if axis not in (0, 'index'):
+      raise NotImplementedError('corrwith(axis=%r)' % axis)
+    if not isinstance(other, frame_base.DeferredFrame):
+      other = frame_base.DeferredFrame.wrap(
+          expressions.ConstantExpression(other))
+
+    if isinstance(other, DeferredSeries):
+      proxy = self._expr.proxy().corrwith(other._expr.proxy())
+      self, other = self.align(other, axis=0, join='inner')
+      corrs = [self[col].corr(other, **kwargs) for col in proxy.index]

Review comment:
       It looks like `method` is actually the only `kwarg` that overlaps between corr and corrwith. We should probably pass that explicitly, and handle the remaining arg, `drop`, here.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -771,6 +773,62 @@ def fill_matrix(*args):
               requires_partition_by=partitionings.Singleton(),
               proxy=proxy))
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def corrwith(self, other, axis, **kwargs):
+    if axis not in (0, 'index'):
+      raise NotImplementedError('corrwith(axis=%r)' % axis)
+    if not isinstance(other, frame_base.DeferredFrame):
+      other = frame_base.DeferredFrame.wrap(
+          expressions.ConstantExpression(other))
+
+    if isinstance(other, DeferredSeries):
+      proxy = self._expr.proxy().corrwith(other._expr.proxy())
+      self, other = self.align(other, axis=0, join='inner')
+      corrs = [self[col].corr(other, **kwargs) for col in proxy.index]
+      def fill_dataframe(*args):
+        result = proxy.copy(deep=True)
+        for col, value in zip(proxy.index, args):
+          result[col] = value
+        return result
+      with expressions.allow_non_parallel_operations(True):
+        return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+            'fill_dataframe',
+            fill_dataframe,
+            [corr._expr for corr in corrs],
+            requires_partition_by=partitionings.Singleton(),
+            proxy=proxy))
+
+    elif isinstance(other, DeferredDataFrame):
+      proxy = self._expr.proxy().corrwith(other._expr.proxy())
+      self, other = self.align(other, axis=0, join='inner')
+      valid_cols = list(
+          set(self.columns)
+          .intersection(other.columns)
+          .intersection(proxy.index))
+      corrs = [self[col].corr(other[col], **kwargs) for col in valid_cols]
+      def fill_dataframe(*args):
+        result = proxy.copy(deep=True)
+        for col, value in zip(valid_cols, args):
+          result[col] = value
+        return result
+      with expressions.allow_non_parallel_operations(True):
+        return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+            'fill_dataframe',
+            fill_dataframe,
+            [corr._expr for corr in corrs],
+            requires_partition_by=partitionings.Singleton(),
+            proxy=proxy))

Review comment:
       nit: The two branches here are almost identical, the only difference is `valid_cols` vs. `proxy.index`. You might consider re-working this so the other logic is shared. That might just make it more confusing though... up to you.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -771,6 +773,62 @@ def fill_matrix(*args):
               requires_partition_by=partitionings.Singleton(),
               proxy=proxy))
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def corrwith(self, other, axis, **kwargs):
+    if axis not in (0, 'index'):
+      raise NotImplementedError('corrwith(axis=%r)' % axis)
+    if not isinstance(other, frame_base.DeferredFrame):
+      other = frame_base.DeferredFrame.wrap(
+          expressions.ConstantExpression(other))
+
+    if isinstance(other, DeferredSeries):
+      proxy = self._expr.proxy().corrwith(other._expr.proxy())
+      self, other = self.align(other, axis=0, join='inner')
+      corrs = [self[col].corr(other, **kwargs) for col in proxy.index]
+      def fill_dataframe(*args):
+        result = proxy.copy(deep=True)
+        for col, value in zip(proxy.index, args):
+          result[col] = value
+        return result
+      with expressions.allow_non_parallel_operations(True):
+        return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+            'fill_dataframe',
+            fill_dataframe,
+            [corr._expr for corr in corrs],
+            requires_partition_by=partitionings.Singleton(),
+            proxy=proxy))
+
+    elif isinstance(other, DeferredDataFrame):
+      proxy = self._expr.proxy().corrwith(other._expr.proxy())
+      self, other = self.align(other, axis=0, join='inner')
+      valid_cols = list(
+          set(self.columns)
+          .intersection(other.columns)
+          .intersection(proxy.index))
+      corrs = [self[col].corr(other[col], **kwargs) for col in valid_cols]
+      def fill_dataframe(*args):

Review comment:
       ```suggestion
         # Generate expressions to compute the actual correlations
         corrs = [self[col].corr(other[col], **kwargs) for col in valid_cols]
         # Combine the results
         def fill_dataframe(*args):
   ```
   
   It took me a while to realize this is what was going on, hopefully this will expedite it for future readers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org