You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/11/24 23:04:22 UTC
[beam] branch release-2.26.0 updated: [release-2.26.0][BEAM-11324]
Fix some partitioning errors that can lead to execution-time errors
(#13418)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch release-2.26.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.26.0 by this push:
new 3efb955 [release-2.26.0][BEAM-11324] Fix some partitioning errors that can lead to execution-time errors (#13418)
3efb955 is described below
commit 3efb95516f5943283aae2ca122e20936fe5f4eae
Author: Brian Hulette <bh...@google.com>
AuthorDate: Tue Nov 24 15:03:37 2020 -0800
[release-2.26.0][BEAM-11324] Fix some partitioning errors that can lead to execution-time errors (#13418)
* set_index does not preserve partitioning (#13398)
* [BEAM-11324] Add additional verification in PartitioningSession (#13401)
* Default to distributed=True in frames_test.py:
* Add additional partitioning verification to PartitioningSession
* Series pre-agg doesn't preserve partitioning
* reset_index doesn't preserve partitioning
* remove whitespace
* store and recover random state, clarify loop
* add TODO
* map_index doesn't preserve partitioning
* lint
* lint
* lint
* Disallow grouping by a series
* lint
* lint
* import order
* fix whitespace merge error
---
sdks/python/apache_beam/dataframe/expressions.py | 92 ++++++++++++-----
sdks/python/apache_beam/dataframe/frames.py | 32 ++----
sdks/python/apache_beam/dataframe/frames_test.py | 115 ++++++++++-----------
.../apache_beam/dataframe/pandas_doctests_test.py | 2 +
sdks/python/apache_beam/dataframe/partitionings.py | 26 +++++
5 files changed, 155 insertions(+), 112 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/expressions.py b/sdks/python/apache_beam/dataframe/expressions.py
index 8697eaf..e275795 100644
--- a/sdks/python/apache_beam/dataframe/expressions.py
+++ b/sdks/python/apache_beam/dataframe/expressions.py
@@ -17,6 +17,7 @@
from __future__ import absolute_import
import contextlib
+import random
import threading
from typing import Any
from typing import Callable
@@ -48,10 +49,15 @@ class Session(object):
class PartitioningSession(Session):
"""An extension of Session that enforces actual partitioning of inputs.
- When evaluating an expression, inputs are partitioned according to its
- `requires_partition_by` specifications, the expression is evaluated on each
- partition separately, and the final result concatinated, as if this were
- actually executed in a parallel manner.
+ Each expression is evaluated multiple times for various supported
+ partitionings determined by its `requires_partition_by` specification. For
+ each tested partitioning, the input is partitioned and the expression is
+ evaluated on each partition separately, as if this were actually executed in
+ a parallel manner.
+
+ For each input partitioning, the results are verified to be partitioned
+ appropriately according to the expression's `preserves_partition_by`
+ specification.
For testing only.
"""
@@ -67,28 +73,62 @@ class PartitioningSession(Session):
result = super(PartitioningSession, self).evaluate(expr)
else:
scaler_args = [arg for arg in expr.args() if is_scalar(arg)]
- parts = collections.defaultdict(
- lambda: Session({arg: self.evaluate(arg)
- for arg in scaler_args}))
- for arg in expr.args():
- if not is_scalar(arg):
- input = self.evaluate(arg)
- for key, part in expr.requires_partition_by().test_partition_fn(
- input):
- parts[key]._bindings[arg] = part
- if not parts:
- parts[None] # Create at least one entry.
-
- results = []
- for session in parts.values():
- if any(len(session.lookup(arg)) for arg in expr.args()
- if not is_scalar(arg)):
- results.append(session.evaluate(expr))
- if results:
- result = pd.concat(results)
- else:
- # Choose any single session.
- result = next(iter(parts.values())).evaluate(expr)
+
+ def evaluate_with(input_partitioning):
+ parts = collections.defaultdict(
+ lambda: Session({arg: self.evaluate(arg)
+ for arg in scaler_args}))
+ for arg in expr.args():
+ if not is_scalar(arg):
+ input = self.evaluate(arg)
+ for key, part in input_partitioning.test_partition_fn(input):
+ parts[key]._bindings[arg] = part
+ if not parts:
+ parts[None] # Create at least one entry.
+
+ results = []
+ for session in parts.values():
+ if any(len(session.lookup(arg)) for arg in expr.args()
+ if not is_scalar(arg)):
+ results.append(session.evaluate(expr))
+
+ expected_output_partitioning = expr.preserves_partition_by(
+ ) if input_partitioning.is_subpartitioning_of(
+ expr.preserves_partition_by()) else input_partitioning
+
+ if not expected_output_partitioning.check(results):
+ raise AssertionError(
+ f"""Expression does not preserve partitioning!
+ Expression: {expr}
+ Requires: {expr.requires_partition_by()}
+ Preserves: {expr.preserves_partition_by()}
+ Input partitioning: {input_partitioning}
+ Expected output partitioning: {expected_output_partitioning}
+ """)
+
+ if results:
+ return pd.concat(results)
+ else:
+ # Choose any single session.
+ return next(iter(parts.values())).evaluate(expr)
+
+ # Store random state so it can be re-used for each execution, in case
+ # the expression is part of a test that relies on the random seed.
+ random_state = random.getstate()
+
+ for input_partitioning in set([expr.requires_partition_by(),
+ partitionings.Nothing(),
+ partitionings.Index(),
+ partitionings.Singleton()]):
+ if not input_partitioning.is_subpartitioning_of(
+ expr.requires_partition_by()):
+ continue
+
+ random.setstate(random_state)
+
+ # TODO(BEAM-11324): Consider verifying result is always the same
+ result = evaluate_with(input_partitioning)
+
self._bindings[expr] = result
return self._bindings[expr]
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 4c3fb68..f27533f 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -137,29 +137,13 @@ class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
'map_index',
map_index, [self._expr],
requires_partition_by=partitionings.Nothing(),
- preserves_partition_by=partitionings.Singleton())
+ preserves_partition_by=partitionings.Nothing())
elif isinstance(by, DeferredSeries):
- if isinstance(self, DeferredSeries):
-
- def set_index(s, by):
- df = pd.DataFrame(s)
- df, by = df.align(by, axis=0, join='inner')
- return df.set_index(by).iloc[:, 0]
-
- else:
-
- def set_index(df, by): # type: ignore
- df, by = df.align(by, axis=0, join='inner')
- return df.set_index(by)
-
- to_group = expressions.ComputedExpression(
- 'set_index',
- set_index, #
- [self._expr, by._expr],
- requires_partition_by=partitionings.Index(),
- preserves_partition_by=partitionings.Singleton())
+ raise NotImplementedError(
+ "grouping by a Series is not yet implemented. You can group by a "
+ "DataFrame column by specifying its name.")
elif isinstance(by, np.ndarray):
raise frame_base.WontImplementError('order sensitive')
@@ -467,9 +451,11 @@ class DeferredSeries(DeferredDataFrameOrSeries):
# We're only handling a single column.
base_func = func[0] if isinstance(func, list) else func
if _is_associative(base_func) and not args and not kwargs:
- intermediate = expressions.elementwise_expression(
+ intermediate = expressions.ComputedExpression(
'pre_aggregate',
- lambda s: s.agg([base_func], *args, **kwargs), [self._expr])
+ lambda s: s.agg([base_func], *args, **kwargs), [self._expr],
+ requires_partition_by=partitionings.Nothing(),
+ preserves_partition_by=partitionings.Nothing())
allow_nonparallel_final = True
else:
intermediate = self._expr
@@ -1377,7 +1363,7 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
'reset_index',
lambda df: df.reset_index(level=level, **kwargs),
[self._expr],
- preserves_partition_by=partitionings.Singleton(),
+ preserves_partition_by=partitionings.Nothing(),
requires_partition_by=requires_partition_by))
round = frame_base._elementwise_method('round')
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index ef9281d..79a38230 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -30,7 +30,7 @@ from apache_beam.dataframe import frames # pylint: disable=unused-import
class DeferredFrameTest(unittest.TestCase):
- def _run_test(self, func, *args, distributed=False):
+ def _run_test(self, func, *args, distributed=True):
deferred_args = [
frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(arg, arg[0:0])) for arg in args
@@ -93,42 +93,36 @@ class DeferredFrameTest(unittest.TestCase):
'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],
'value': [None if i % 11 == 0 else i for i in range(100)]
})
- self._run_test(
- lambda df: df.groupby('group').agg(sum), df, distributed=True)
- self._run_test(lambda df: df.groupby('group').sum(), df, distributed=True)
- self._run_test(
- lambda df: df.groupby('group').median(), df, distributed=True)
- self._run_test(lambda df: df.groupby('group').size(), df, distributed=True)
- self._run_test(lambda df: df.groupby('group').count(), df, distributed=True)
- self._run_test(lambda df: df.groupby('group').max(), df, distributed=True)
- self._run_test(lambda df: df.groupby('group').min(), df, distributed=True)
- self._run_test(lambda df: df.groupby('group').mean(), df, distributed=True)
+ self._run_test(lambda df: df.groupby('group').agg(sum), df)
+ self._run_test(lambda df: df.groupby('group').sum(), df)
+ self._run_test(lambda df: df.groupby('group').median(), df)
+ self._run_test(lambda df: df.groupby('group').size(), df)
+ self._run_test(lambda df: df.groupby('group').count(), df)
+ self._run_test(lambda df: df.groupby('group').max(), df)
+ self._run_test(lambda df: df.groupby('group').min(), df)
+ self._run_test(lambda df: df.groupby('group').mean(), df)
+
+ self._run_test(lambda df: df[df.value > 30].groupby('group').sum(), df)
+ self._run_test(lambda df: df[df.value > 30].groupby('group').mean(), df)
+ self._run_test(lambda df: df[df.value > 30].groupby('group').size(), df)
+
+ # Grouping by a series is not currently supported
+ #self._run_test(lambda df: df[df.value > 40].groupby(df.group).sum(), df)
+ #self._run_test(lambda df: df[df.value > 40].groupby(df.group).mean(), df)
+ #self._run_test(lambda df: df[df.value > 40].groupby(df.group).size(), df)
+
+ # Example from https://pandas.pydata.org/docs/user_guide/groupby.html
+ arrays = [['bar', 'bar', 'baz', 'baz', 'foo', 'foo', 'qux', 'qux'],
+ ['one', 'two', 'one', 'two', 'one', 'two', 'one', 'two']]
+
+ index = pd.MultiIndex.from_arrays(arrays, names=['first', 'second'])
- self._run_test(
- lambda df: df[df.value > 30].groupby('group').sum(),
- df,
- distributed=True)
- self._run_test(
- lambda df: df[df.value > 30].groupby('group').mean(),
- df,
- distributed=True)
- self._run_test(
- lambda df: df[df.value > 30].groupby('group').size(),
- df,
- distributed=True)
+ df = pd.DataFrame({
+ 'A': [1, 1, 1, 1, 2, 2, 3, 3], 'B': np.arange(8)
+ },
+ index=index)
- self._run_test(
- lambda df: df[df.value > 40].groupby(df.group).sum(),
- df,
- distributed=True)
- self._run_test(
- lambda df: df[df.value > 40].groupby(df.group).mean(),
- df,
- distributed=True)
- self._run_test(
- lambda df: df[df.value > 40].groupby(df.group).size(),
- df,
- distributed=True)
+ self._run_test(lambda df: df.groupby(['second', 'A']).sum(), df)
@unittest.skipIf(sys.version_info <= (3, ), 'differing signature')
def test_merge(self):
@@ -160,24 +154,24 @@ class DeferredFrameTest(unittest.TestCase):
def test_series_getitem(self):
s = pd.Series([x**2 for x in range(10)])
- self._run_test(lambda s: s[...], s, distributed=True)
- self._run_test(lambda s: s[:], s, distributed=True)
- self._run_test(lambda s: s[s < 10], s, distributed=True)
- self._run_test(lambda s: s[lambda s: s < 10], s, distributed=True)
+ self._run_test(lambda s: s[...], s)
+ self._run_test(lambda s: s[:], s)
+ self._run_test(lambda s: s[s < 10], s)
+ self._run_test(lambda s: s[lambda s: s < 10], s)
s.index = s.index.map(float)
- self._run_test(lambda s: s[1.5:6], s, distributed=True)
+ self._run_test(lambda s: s[1.5:6], s)
def test_dataframe_getitem(self):
df = pd.DataFrame({'A': [x**2 for x in range(6)], 'B': list('abcdef')})
- self._run_test(lambda df: df['A'], df, distributed=True)
- self._run_test(lambda df: df[['A', 'B']], df, distributed=True)
+ self._run_test(lambda df: df['A'], df)
+ self._run_test(lambda df: df[['A', 'B']], df)
- self._run_test(lambda df: df[:], df, distributed=True)
- self._run_test(lambda df: df[df.A < 10], df, distributed=True)
+ self._run_test(lambda df: df[:], df)
+ self._run_test(lambda df: df[df.A < 10], df)
df.index = df.index.map(float)
- self._run_test(lambda df: df[1.5:4], df, distributed=True)
+ self._run_test(lambda df: df[1.5:4], df)
def test_loc(self):
dates = pd.date_range('1/1/2000', periods=8)
@@ -222,27 +216,23 @@ class DeferredFrameTest(unittest.TestCase):
for s in [pd.Series([1, 2, 3]),
pd.Series(range(100)),
pd.Series([x**3 for x in range(-50, 50)])]:
- self._run_test(lambda s: s.std(), s, distributed=True)
- self._run_test(lambda s: s.corr(s), s, distributed=True)
- self._run_test(lambda s: s.corr(s + 1), s, distributed=True)
- self._run_test(lambda s: s.corr(s * s), s, distributed=True)
- self._run_test(lambda s: s.cov(s * s), s, distributed=True)
+ self._run_test(lambda s: s.std(), s)
+ self._run_test(lambda s: s.corr(s), s)
+ self._run_test(lambda s: s.corr(s + 1), s)
+ self._run_test(lambda s: s.corr(s * s), s)
+ self._run_test(lambda s: s.cov(s * s), s)
def test_dataframe_cov_corr(self):
df = pd.DataFrame(np.random.randn(20, 3), columns=['a', 'b', 'c'])
df.loc[df.index[:5], 'a'] = np.nan
df.loc[df.index[5:10], 'b'] = np.nan
- self._run_test(lambda df: df.corr().round(8), df, distributed=True)
- self._run_test(lambda df: df.cov().round(8), df, distributed=True)
+ self._run_test(lambda df: df.corr().round(8), df)
+ self._run_test(lambda df: df.cov().round(8), df)
+ self._run_test(lambda df: df.corr(min_periods=12).round(8), df)
+ self._run_test(lambda df: df.cov(min_periods=12).round(8), df)
+ self._run_test(lambda df: df.corrwith(df.a).round(8), df)
self._run_test(
- lambda df: df.corr(min_periods=12).round(8), df, distributed=True)
- self._run_test(
- lambda df: df.cov(min_periods=12).round(8), df, distributed=True)
- self._run_test(lambda df: df.corrwith(df.a).round(8), df, distributed=True)
- self._run_test(
- lambda df: df[['a', 'b']].corrwith(df[['b', 'c']]).round(8),
- df,
- distributed=True)
+ lambda df: df[['a', 'b']].corrwith(df[['b', 'c']]).round(8), df)
def test_categorical_groupby(self):
df = pd.DataFrame({'A': np.arange(6), 'B': list('aabbca')})
@@ -251,9 +241,8 @@ class DeferredFrameTest(unittest.TestCase):
# TODO(BEAM-11190): These aggregations can be done in index partitions, but
# it will require a little more complex logic
with beam.dataframe.allow_non_parallel_operations():
- self._run_test(lambda df: df.groupby(level=0).sum(), df, distributed=True)
- self._run_test(
- lambda df: df.groupby(level=0).mean(), df, distributed=True)
+ self._run_test(lambda df: df.groupby(level=0).sum(), df)
+ self._run_test(lambda df: df.groupby(level=0).mean(), df)
class AllowNonParallelTest(unittest.TestCase):
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index a40f614..9afc21a 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -237,6 +237,8 @@ class DoctestTest(unittest.TestCase):
'ser.groupby(["a", "b", "a", "b"]).mean()',
'ser.groupby(["a", "b", "a", np.nan]).mean()',
'ser.groupby(["a", "b", "a", np.nan], dropna=False).mean()',
+ # Grouping by a series is not supported
+ 'ser.groupby(ser > 100).mean()',
],
'pandas.core.series.Series.reindex': ['*'],
},
diff --git a/sdks/python/apache_beam/dataframe/partitionings.py b/sdks/python/apache_beam/dataframe/partitionings.py
index 53454b1..9baf9c9 100644
--- a/sdks/python/apache_beam/dataframe/partitionings.py
+++ b/sdks/python/apache_beam/dataframe/partitionings.py
@@ -115,6 +115,26 @@ class Index(Partitioning):
for key in range(num_partitions):
yield key, df[hashes % num_partitions == key]
+ def check(self, dfs):
+ # TODO(BEAM-11324): This check should be stronger, it should verify that
+ # running partition_fn on the concatenation of dfs yields the same
+ # partitions.
+ if self._levels is None:
+
+ def get_index_set(df):
+ return set(df.index)
+ else:
+
+ def get_index_set(df):
+ return set(zip(df.index.level[level] for level in self._levels))
+
+ index_sets = [get_index_set(df) for df in dfs]
+ for i, index_set in enumerate(index_sets[:-1]):
+ if not index_set.isdisjoint(set.union(*index_sets[i + 1:])):
+ return False
+
+ return True
+
class Singleton(Partitioning):
"""A partitioning of all the data into a single partition.
@@ -134,6 +154,9 @@ class Singleton(Partitioning):
def partition_fn(self, df, num_partitions):
yield None, df
+ def check(self, dfs):
+ return len(dfs) <= 1
+
class Nothing(Partitioning):
"""A partitioning imposing no constraints on the actual partitioning.
@@ -162,3 +185,6 @@ class Nothing(Partitioning):
part = pd.Series(shuffled(range(len(df))), index=df.index) % num_partitions
for k in range(num_partitions):
yield k, df[part == k]
+
+ def check(self, dfs):
+ return True