You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/10/02 23:33:27 UTC
[beam] branch master updated: [BEAM-9547] Implement dot method for
dataframes. (#12990)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0738042 [BEAM-9547] Implement dot method for dataframes. (#12990)
0738042 is described below
commit 0738042cd8c6e108ebeb0ea24581cd3a3e3b723f
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Fri Oct 2 16:32:58 2020 -0700
[BEAM-9547] Implement dot method for dataframes. (#12990)
---
sdks/python/apache_beam/dataframe/frames.py | 71 +++++++++++++++++++++-
.../apache_beam/dataframe/pandas_doctests_test.py | 12 ++--
2 files changed, 78 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 88b4b79..1d0a4b3 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -34,6 +34,45 @@ class DeferredSeries(frame_base.DeferredFrame):
between = frame_base._elementwise_method('between')
+ def dot(self, other):
+ left = self._expr
+ if isinstance(other, DeferredSeries):
+ right = expressions.ComputedExpression(
+ 'to_dataframe',
+ pd.DataFrame, [other._expr],
+ requires_partition_by=partitionings.Nothing(),
+ preserves_partition_by=partitionings.Index())
+ right_is_series = True
+ elif isinstance(other, DeferredDataFrame):
+ right = other._expr
+ right_is_series = False
+ else:
+ raise frame_base.WontImplementError('non-deferred result')
+
+ dots = expressions.ComputedExpression(
+ 'dot',
+ # Transpose so we can sum across rows.
+ (lambda left, right: pd.DataFrame(left @ right).T),
+ [left, right],
+ requires_partition_by=partitionings.Index())
+ with expressions.allow_non_parallel_operations(True):
+ sums = expressions.ComputedExpression(
+ 'sum',
+ lambda dots: dots.sum(), #
+ [dots],
+ requires_partition_by=partitionings.Singleton())
+
+ if right_is_series:
+ result = expressions.ComputedExpression(
+ 'extract',
+ lambda df: df[0], [sums],
+ requires_partition_by=partitionings.Singleton())
+ else:
+ result = sums
+ return frame_base.DeferredFrame.wrap(result)
+
+ __matmul__ = dot
+
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
@@ -336,7 +375,6 @@ class DeferredDataFrame(frame_base.DeferredFrame):
cov = frame_base.not_implemented_method('cov')
corr = frame_base.not_implemented_method('corr')
count = frame_base.not_implemented_method('count')
- dot = frame_base.not_implemented_method('dot')
drop = frame_base.not_implemented_method('drop')
eval = frame_base.not_implemented_method('eval')
reindex = frame_base.not_implemented_method('reindex')
@@ -415,6 +453,37 @@ class DeferredDataFrame(frame_base.DeferredFrame):
'order-sensitive')
diff = frame_base.wont_implement_method('order-sensitive')
+ def dot(self, other):
+ # We want to broadcast the right hand side to all partitions of the left.
+ # This is OK, as its index must be the same size as the columns set of self,
+ # so cannot be too large.
+ class AsScalar(object):
+ def __init__(self, value):
+ self.value = value
+
+ if isinstance(other, frame_base.DeferredFrame):
+ proxy = other._expr.proxy()
+ with expressions.allow_non_parallel_operations():
+ side = expressions.ComputedExpression(
+ 'as_scalar',
+ lambda df: AsScalar(df),
+ [other._expr],
+ requires_partition_by=partitionings.Singleton())
+ else:
+ proxy = pd.DataFrame(columns=range(len(other[0])))
+ side = expressions.ConstantExpression(AsScalar(other))
+
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'dot',
+ lambda left, right: left @ right.value,
+ [self._expr, side],
+ requires_partition_by=partitionings.Nothing(),
+ preserves_partition_by=partitionings.Index(),
+ proxy=proxy))
+
+ __matmul__ = dot
+
head = tail = frame_base.wont_implement_method('order-sensitive')
max = frame_base._agg_method('max')
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index abc2b7d..7b7e523 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -104,7 +104,11 @@ class DoctestTest(unittest.TestCase):
# Difficult to parallelize but should be possible?
'pandas.core.frame.DataFrame.corr': ['*'],
'pandas.core.frame.DataFrame.cov': ['*'],
- 'pandas.core.frame.DataFrame.dot': ['*'],
+ 'pandas.core.frame.DataFrame.dot': [
+ # reindex not supported
+ 's2 = s.reindex([1, 0, 2, 3])',
+ 'df.dot(s2)',
+ ],
# element-wise
'pandas.core.frame.DataFrame.eval': ['*'],
@@ -161,8 +165,6 @@ class DoctestTest(unittest.TestCase):
" ignore_index=True)"
],
'pandas.core.frame.DataFrame.eval': ['df'],
- # No override for __matmul__ and friends
- 'pandas.core.frame.DataFrame.dot': ['df @ other'],
'pandas.core.frame.DataFrame.melt': [
"df.columns = [list('ABC'), list('DEF')]", "df"
],
@@ -210,6 +212,9 @@ class DoctestTest(unittest.TestCase):
'pandas.core.series.Series.cumsum': ['*'],
'pandas.core.series.Series.cumprod': ['*'],
'pandas.core.series.Series.diff': ['*'],
+ 'pandas.core.series.Series.dot': [
+ 's.dot(arr)', # non-deferred result
+ ],
'pandas.core.series.Series.items': ['*'],
'pandas.core.series.Series.iteritems': ['*'],
# default keep is 'first'
@@ -248,7 +253,6 @@ class DoctestTest(unittest.TestCase):
'pandas.core.series.Series.corr': ['*'],
'pandas.core.series.Series.count': ['*'],
'pandas.core.series.Series.cov': ['*'],
- 'pandas.core.series.Series.dot': ['*'],
'pandas.core.series.Series.drop': ['*'],
'pandas.core.series.Series.drop_duplicates': ['*'],
'pandas.core.series.Series.duplicated': ['*'],