You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ud...@apache.org on 2021/08/31 20:31:15 UTC

[beam] branch release-2.33.0 updated: Revert "Merge pull request #15271 Decreasing peak memory usage for beam.TupleCombineFn."

This is an automated email from the ASF dual-hosted git repository.

udim pushed a commit to branch release-2.33.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.33.0 by this push:
     new 16de773  Revert "Merge pull request #15271 Decreasing peak memory usage for beam.TupleCombineFn."
     new b3328ee  Merge pull request #15422 from ibzib/combine-rollback-release
16de773 is described below

commit 16de7733bd9b7060113da5df3d70e48276274390
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Thu Aug 26 16:28:24 2021 -0700

    Revert "Merge pull request #15271 Decreasing peak memory usage for beam.TupleCombineFn."
    
    This reverts commit 4559c75863d9d6c9dd9e48c2b4f12f2139410524, reversing
    changes made to 7611831443399f31fc505bc3451f2b56f245d4e4.
---
 sdks/python/apache_beam/transforms/combiners.py    | 34 ++++---------------
 .../apache_beam/transforms/combiners_test.py       | 38 +---------------------
 2 files changed, 7 insertions(+), 65 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index 7e6b1f9..bcedd86 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -21,7 +21,6 @@
 
 import copy
 import heapq
-import itertools
 import operator
 import random
 from typing import Any
@@ -598,24 +597,16 @@ class SampleCombineFn(core.CombineFn):
 
 
 class _TupleCombineFnBase(core.CombineFn):
-  def __init__(self, *combiners, merge_accumulators_batch_size=None):
+  def __init__(self, *combiners):
     self._combiners = [core.CombineFn.maybe_from_callable(c) for c in combiners]
     self._named_combiners = combiners
-    # If the `merge_accumulators_batch_size` value is not specified, we chose a
-    # bounded default that is inversely proportional to the number of
-    # accumulators in merged tuples.
-    self._merge_accumulators_batch_size = (
-        merge_accumulators_batch_size or max(10, 1000 // len(combiners)))
 
   def display_data(self):
     combiners = [
         c.__name__ if hasattr(c, '__name__') else c.__class__.__name__
         for c in self._named_combiners
     ]
-    return {
-        'combiners': str(combiners),
-        'merge_accumulators_batch_size': self._merge_accumulators_batch_size
-    }
+    return {'combiners': str(combiners)}
 
   def setup(self, *args, **kwargs):
     for c in self._combiners:
@@ -625,23 +616,10 @@ class _TupleCombineFnBase(core.CombineFn):
     return [c.create_accumulator(*args, **kwargs) for c in self._combiners]
 
   def merge_accumulators(self, accumulators, *args, **kwargs):
-    # Make sure that `accumulators` is an iterator (so that the position is
-    # remembered).
-    accumulators = iter(accumulators)
-    result = next(accumulators)
-    while True:
-      # Load accumulators into memory and merge in batches to decrease peak
-      # memory usage.
-      accumulators_batch = list(
-          itertools.islice(accumulators, self._merge_accumulators_batch_size))
-      if not accumulators_batch:
-        break
-      accumulators_batch += [result]
-      result = [
-          c.merge_accumulators(a, *args, **kwargs) for c,
-          a in zip(self._combiners, zip(*accumulators_batch))
-      ]
-    return result
+    return [
+        c.merge_accumulators(a, *args, **kwargs) for c,
+        a in zip(self._combiners, zip(*accumulators))
+    ]
 
   def compact(self, accumulator, *args, **kwargs):
     return [
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 68b273e..d826287 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -249,8 +249,7 @@ class CombineTest(unittest.TestCase):
     dd = DisplayData.create_from(transform)
     expected_items = [
         DisplayDataItemMatcher('combine_fn', combine.TupleCombineFn),
-        DisplayDataItemMatcher('combiners', "['max', 'MeanCombineFn', 'sum']"),
-        DisplayDataItemMatcher('merge_accumulators_batch_size', 333),
+        DisplayDataItemMatcher('combiners', "['max', 'MeanCombineFn', 'sum']")
     ]
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
@@ -359,41 +358,6 @@ class CombineTest(unittest.TestCase):
                   max).with_common_input()).without_defaults())
       assert_that(result, equal_to([(1, 7.0 / 4, 3)]))
 
-  def test_tuple_combine_fn_batched_merge(self):
-    num_combine_fns = 10
-    max_num_accumulators_in_memory = 30
-    # Maximum number of accumulator tuples in memory - 1 for the merge result.
-    merge_accumulators_batch_size = (
-        max_num_accumulators_in_memory // num_combine_fns - 1)
-    num_accumulator_tuples_to_merge = 20
-
-    class CountedAccumulator:
-      count = 0
-      oom = False
-
-      def __init__(self):
-        if CountedAccumulator.count > max_num_accumulators_in_memory:
-          CountedAccumulator.oom = True
-        else:
-          CountedAccumulator.count += 1
-
-    class CountedAccumulatorCombineFn(beam.CombineFn):
-      def create_accumulator(self):
-        return CountedAccumulator()
-
-      def merge_accumulators(self, accumulators):
-        CountedAccumulator.count += 1
-        for _ in accumulators:
-          CountedAccumulator.count -= 1
-
-    combine_fn = combine.TupleCombineFn(
-        *[CountedAccumulatorCombineFn() for _ in range(num_combine_fns)],
-        merge_accumulators_batch_size=merge_accumulators_batch_size)
-    combine_fn.merge_accumulators(
-        combine_fn.create_accumulator()
-        for _ in range(num_accumulator_tuples_to_merge))
-    assert not CountedAccumulator.oom
-
   def test_to_list_and_to_dict1(self):
     with TestPipeline() as pipeline:
       the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]