You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/10/02 23:33:27 UTC

[beam] branch master updated: [BEAM-9547] Implement dot method for dataframes. (#12990)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0738042  [BEAM-9547] Implement dot method for dataframes. (#12990)
0738042 is described below

commit 0738042cd8c6e108ebeb0ea24581cd3a3e3b723f
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Fri Oct 2 16:32:58 2020 -0700

    [BEAM-9547] Implement dot method for dataframes. (#12990)
---
 sdks/python/apache_beam/dataframe/frames.py        | 71 +++++++++++++++++++++-
 .../apache_beam/dataframe/pandas_doctests_test.py  | 12 ++--
 2 files changed, 78 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 88b4b79..1d0a4b3 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -34,6 +34,45 @@ class DeferredSeries(frame_base.DeferredFrame):
 
   between = frame_base._elementwise_method('between')
 
+  def dot(self, other):
+    left = self._expr
+    if isinstance(other, DeferredSeries):
+      right = expressions.ComputedExpression(
+          'to_dataframe',
+          pd.DataFrame, [other._expr],
+          requires_partition_by=partitionings.Nothing(),
+          preserves_partition_by=partitionings.Index())
+      right_is_series = True
+    elif isinstance(other, DeferredDataFrame):
+      right = other._expr
+      right_is_series = False
+    else:
+      raise frame_base.WontImplementError('non-deferred result')
+
+    dots = expressions.ComputedExpression(
+        'dot',
+        # Transpose so we can sum across rows.
+        (lambda left, right: pd.DataFrame(left @ right).T),
+        [left, right],
+        requires_partition_by=partitionings.Index())
+    with expressions.allow_non_parallel_operations(True):
+      sums = expressions.ComputedExpression(
+          'sum',
+          lambda dots: dots.sum(),  #
+          [dots],
+          requires_partition_by=partitionings.Singleton())
+
+      if right_is_series:
+        result = expressions.ComputedExpression(
+            'extract',
+            lambda df: df[0], [sums],
+            requires_partition_by=partitionings.Singleton())
+      else:
+        result = sums
+      return frame_base.DeferredFrame.wrap(result)
+
+  __matmul__ = dot
+
   @frame_base.args_to_kwargs(pd.Series)
   @frame_base.populate_defaults(pd.Series)
   @frame_base.maybe_inplace
@@ -336,7 +375,6 @@ class DeferredDataFrame(frame_base.DeferredFrame):
   cov = frame_base.not_implemented_method('cov')
   corr = frame_base.not_implemented_method('corr')
   count = frame_base.not_implemented_method('count')
-  dot = frame_base.not_implemented_method('dot')
   drop = frame_base.not_implemented_method('drop')
   eval = frame_base.not_implemented_method('eval')
   reindex = frame_base.not_implemented_method('reindex')
@@ -415,6 +453,37 @@ class DeferredDataFrame(frame_base.DeferredFrame):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  def dot(self, other):
+    # We want to broadcast the right hand side to all partitions of the left.
+    # This is OK, as its index must be the same size as the columns set of self,
+    # so cannot be too large.
+    class AsScalar(object):
+      def __init__(self, value):
+        self.value = value
+
+    if isinstance(other, frame_base.DeferredFrame):
+      proxy = other._expr.proxy()
+      with expressions.allow_non_parallel_operations():
+        side = expressions.ComputedExpression(
+            'as_scalar',
+            lambda df: AsScalar(df),
+            [other._expr],
+            requires_partition_by=partitionings.Singleton())
+    else:
+      proxy = pd.DataFrame(columns=range(len(other[0])))
+      side = expressions.ConstantExpression(AsScalar(other))
+
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'dot',
+            lambda left, right: left @ right.value,
+            [self._expr, side],
+            requires_partition_by=partitionings.Nothing(),
+            preserves_partition_by=partitionings.Index(),
+            proxy=proxy))
+
+  __matmul__ = dot
+
   head = tail = frame_base.wont_implement_method('order-sensitive')
 
   max = frame_base._agg_method('max')
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index abc2b7d..7b7e523 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -104,7 +104,11 @@ class DoctestTest(unittest.TestCase):
             # Difficult to parallelize but should be possible?
             'pandas.core.frame.DataFrame.corr': ['*'],
             'pandas.core.frame.DataFrame.cov': ['*'],
-            'pandas.core.frame.DataFrame.dot': ['*'],
+            'pandas.core.frame.DataFrame.dot': [
+                # reindex not supported
+                's2 = s.reindex([1, 0, 2, 3])',
+                'df.dot(s2)',
+            ],
 
             # element-wise
             'pandas.core.frame.DataFrame.eval': ['*'],
@@ -161,8 +165,6 @@ class DoctestTest(unittest.TestCase):
                 "          ignore_index=True)"
             ],
             'pandas.core.frame.DataFrame.eval': ['df'],
-            # No override for __matmul__ and friends
-            'pandas.core.frame.DataFrame.dot': ['df @ other'],
             'pandas.core.frame.DataFrame.melt': [
                 "df.columns = [list('ABC'), list('DEF')]", "df"
             ],
@@ -210,6 +212,9 @@ class DoctestTest(unittest.TestCase):
             'pandas.core.series.Series.cumsum': ['*'],
             'pandas.core.series.Series.cumprod': ['*'],
             'pandas.core.series.Series.diff': ['*'],
+            'pandas.core.series.Series.dot': [
+                's.dot(arr)',  # non-deferred result
+            ],
             'pandas.core.series.Series.items': ['*'],
             'pandas.core.series.Series.iteritems': ['*'],
             # default keep is 'first'
@@ -248,7 +253,6 @@ class DoctestTest(unittest.TestCase):
             'pandas.core.series.Series.corr': ['*'],
             'pandas.core.series.Series.count': ['*'],
             'pandas.core.series.Series.cov': ['*'],
-            'pandas.core.series.Series.dot': ['*'],
             'pandas.core.series.Series.drop': ['*'],
             'pandas.core.series.Series.drop_duplicates': ['*'],
             'pandas.core.series.Series.duplicated': ['*'],