You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ud...@apache.org on 2021/08/31 20:31:15 UTC
[beam] branch release-2.33.0 updated: Revert "Merge pull request
#15271 Decreasing peak memory usage for beam.TupleCombineFn."
This is an automated email from the ASF dual-hosted git repository.
udim pushed a commit to branch release-2.33.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.33.0 by this push:
new 16de773 Revert "Merge pull request #15271 Decreasing peak memory usage for beam.TupleCombineFn."
new b3328ee Merge pull request #15422 from ibzib/combine-rollback-release
16de773 is described below
commit 16de7733bd9b7060113da5df3d70e48276274390
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Thu Aug 26 16:28:24 2021 -0700
Revert "Merge pull request #15271 Decreasing peak memory usage for beam.TupleCombineFn."
This reverts commit 4559c75863d9d6c9dd9e48c2b4f12f2139410524, reversing
changes made to 7611831443399f31fc505bc3451f2b56f245d4e4.
---
sdks/python/apache_beam/transforms/combiners.py | 34 ++++---------------
.../apache_beam/transforms/combiners_test.py | 38 +---------------------
2 files changed, 7 insertions(+), 65 deletions(-)
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index 7e6b1f9..bcedd86 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -21,7 +21,6 @@
import copy
import heapq
-import itertools
import operator
import random
from typing import Any
@@ -598,24 +597,16 @@ class SampleCombineFn(core.CombineFn):
class _TupleCombineFnBase(core.CombineFn):
- def __init__(self, *combiners, merge_accumulators_batch_size=None):
+ def __init__(self, *combiners):
self._combiners = [core.CombineFn.maybe_from_callable(c) for c in combiners]
self._named_combiners = combiners
- # If the `merge_accumulators_batch_size` value is not specified, we chose a
- # bounded default that is inversely proportional to the number of
- # accumulators in merged tuples.
- self._merge_accumulators_batch_size = (
- merge_accumulators_batch_size or max(10, 1000 // len(combiners)))
def display_data(self):
combiners = [
c.__name__ if hasattr(c, '__name__') else c.__class__.__name__
for c in self._named_combiners
]
- return {
- 'combiners': str(combiners),
- 'merge_accumulators_batch_size': self._merge_accumulators_batch_size
- }
+ return {'combiners': str(combiners)}
def setup(self, *args, **kwargs):
for c in self._combiners:
@@ -625,23 +616,10 @@ class _TupleCombineFnBase(core.CombineFn):
return [c.create_accumulator(*args, **kwargs) for c in self._combiners]
def merge_accumulators(self, accumulators, *args, **kwargs):
- # Make sure that `accumulators` is an iterator (so that the position is
- # remembered).
- accumulators = iter(accumulators)
- result = next(accumulators)
- while True:
- # Load accumulators into memory and merge in batches to decrease peak
- # memory usage.
- accumulators_batch = list(
- itertools.islice(accumulators, self._merge_accumulators_batch_size))
- if not accumulators_batch:
- break
- accumulators_batch += [result]
- result = [
- c.merge_accumulators(a, *args, **kwargs) for c,
- a in zip(self._combiners, zip(*accumulators_batch))
- ]
- return result
+ return [
+ c.merge_accumulators(a, *args, **kwargs) for c,
+ a in zip(self._combiners, zip(*accumulators))
+ ]
def compact(self, accumulator, *args, **kwargs):
return [
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 68b273e..d826287 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -249,8 +249,7 @@ class CombineTest(unittest.TestCase):
dd = DisplayData.create_from(transform)
expected_items = [
DisplayDataItemMatcher('combine_fn', combine.TupleCombineFn),
- DisplayDataItemMatcher('combiners', "['max', 'MeanCombineFn', 'sum']"),
- DisplayDataItemMatcher('merge_accumulators_batch_size', 333),
+ DisplayDataItemMatcher('combiners', "['max', 'MeanCombineFn', 'sum']")
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
@@ -359,41 +358,6 @@ class CombineTest(unittest.TestCase):
max).with_common_input()).without_defaults())
assert_that(result, equal_to([(1, 7.0 / 4, 3)]))
- def test_tuple_combine_fn_batched_merge(self):
- num_combine_fns = 10
- max_num_accumulators_in_memory = 30
- # Maximum number of accumulator tuples in memory - 1 for the merge result.
- merge_accumulators_batch_size = (
- max_num_accumulators_in_memory // num_combine_fns - 1)
- num_accumulator_tuples_to_merge = 20
-
- class CountedAccumulator:
- count = 0
- oom = False
-
- def __init__(self):
- if CountedAccumulator.count > max_num_accumulators_in_memory:
- CountedAccumulator.oom = True
- else:
- CountedAccumulator.count += 1
-
- class CountedAccumulatorCombineFn(beam.CombineFn):
- def create_accumulator(self):
- return CountedAccumulator()
-
- def merge_accumulators(self, accumulators):
- CountedAccumulator.count += 1
- for _ in accumulators:
- CountedAccumulator.count -= 1
-
- combine_fn = combine.TupleCombineFn(
- *[CountedAccumulatorCombineFn() for _ in range(num_combine_fns)],
- merge_accumulators_batch_size=merge_accumulators_batch_size)
- combine_fn.merge_accumulators(
- combine_fn.create_accumulator()
- for _ in range(num_accumulator_tuples_to_merge))
- assert not CountedAccumulator.oom
-
def test_to_list_and_to_dict1(self):
with TestPipeline() as pipeline:
the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]