You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2022/11/28 16:08:07 UTC

[beam] 01/01: Revert "Force discarding mode in with_fanout without rewindowing. (#23828)"

This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch revert-23828-fanout
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2897e1ac80167b1801a12a1b3dd3ce7a5b805f14
Author: tvalentyn <tv...@users.noreply.github.com>
AuthorDate: Mon Nov 28 08:07:57 2022 -0800

    Revert "Force discarding mode in with_fanout without rewindowing. (#23828)"
    
    This reverts commit 42688356bd27cd1db0e1b03212249d7f424d276c.
---
 .../apache_beam/transforms/combiners_test.py       | 61 ----------------------
 sdks/python/apache_beam/transforms/core.py         | 14 ++---
 2 files changed, 4 insertions(+), 71 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 2ea0ba862b9..7e0e83542ee 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -563,67 +563,6 @@ class CombineTest(unittest.TestCase):
 
       assert_that(result, has_expected_values)
 
-  def test_combining_with_sliding_windows_and_fanout(self):
-    options = PipelineOptions()
-    options.view_as(StandardOptions).streaming = True
-    with TestPipeline(options=options) as p:
-
-      def has_expected_values(actual):
-        from hamcrest.core import assert_that as hamcrest_assert
-        from hamcrest.library.collection import only_contains
-        ordered = sorted(actual)
-
-        hamcrest_assert(
-            ordered,
-            only_contains([0, 1, 2, 3], [0, 1, 2, 3, 5, 6, 7, 8], [5, 6, 7, 8]))
-
-      result = (
-          p
-          | beam.Create([
-              window.TimestampedValue(0, Timestamp(seconds=1666707510)),
-              window.TimestampedValue(1, Timestamp(seconds=1666707511)),
-              window.TimestampedValue(2, Timestamp(seconds=1666707512)),
-              window.TimestampedValue(3, Timestamp(seconds=1666707513)),
-              window.TimestampedValue(5, Timestamp(seconds=1666707515)),
-              window.TimestampedValue(6, Timestamp(seconds=1666707516)),
-              window.TimestampedValue(7, Timestamp(seconds=1666707517)),
-              window.TimestampedValue(8, Timestamp(seconds=1666707518))
-          ])
-          | beam.WindowInto(window.SlidingWindows(10, 5))
-          | beam.CombineGlobally(beam.combiners.ToListCombineFn()).
-          without_defaults().with_fanout(7))
-      assert_that(result, has_expected_values)
-
-  def test_combining_with_session_windows_and_fanout(self):
-    options = PipelineOptions()
-    options.view_as(StandardOptions).streaming = True
-    with TestPipeline(options=options) as p:
-
-      def has_expected_values(actual):
-        from hamcrest.core import assert_that as hamcrest_assert
-        from hamcrest.library.collection import only_contains
-        ordered = sorted(actual)
-
-        hamcrest_assert(ordered, only_contains([0, 1, 2, 3], [5, 6, 7, 8]))
-
-      result = (
-          p
-          | beam.Create([
-              window.TimestampedValue(0, Timestamp(seconds=1666707510)),
-              window.TimestampedValue(1, Timestamp(seconds=1666707511)),
-              window.TimestampedValue(2, Timestamp(seconds=1666707512)),
-              window.TimestampedValue(3, Timestamp(seconds=1666707513)),
-              window.TimestampedValue(5, Timestamp(seconds=1666707515)),
-              window.TimestampedValue(6, Timestamp(seconds=1666707516)),
-              window.TimestampedValue(7, Timestamp(seconds=1666707517)),
-              window.TimestampedValue(8, Timestamp(seconds=1666707518))
-          ])
-          | beam.WindowInto(window.Sessions(2))
-          | beam.CombineGlobally(beam.combiners.ToListCombineFn()).
-          without_defaults().with_fanout(7))
-
-      assert_that(result, has_expected_values)
-
   def test_MeanCombineFn_combine(self):
     with TestPipeline() as p:
       input = (
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 317844170ca..978e0e1eac3 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2743,8 +2743,6 @@ class _CombinePerKeyWithHotKeyFanout(PTransform):
   def expand(self, pcoll):
 
     from apache_beam.transforms.trigger import AccumulationMode
-    from apache_beam.transforms.util import _IdentityWindowFn
-
     combine_fn = self._combine_fn
     fanout_fn = self._fanout_fn
 
@@ -2804,15 +2802,11 @@ class _CombinePerKeyWithHotKeyFanout(PTransform):
     precombined_hot = (
         hot
         # Avoid double counting that may happen with stacked accumulating mode.
-        | 'ForceDiscardingAccumulation' >> WindowInto(
-            _IdentityWindowFn(pcoll.windowing.windowfn.get_window_coder()),
-            trigger=pcoll.windowing.triggerfn,
-            accumulation_mode=AccumulationMode.DISCARDING,
-            timestamp_combiner=pcoll.windowing.timestamp_combiner,
-            allowed_lateness=pcoll.windowing.allowed_lateness)
+        | 'WindowIntoDiscarding' >> WindowInto(
+            pcoll.windowing, accumulation_mode=AccumulationMode.DISCARDING)
         | CombinePerKey(PreCombineFn())
-        | Map(StripNonce))
-
+        | Map(StripNonce)
+        | 'WindowIntoOriginal' >> WindowInto(pcoll.windowing))
     return ((cold, precombined_hot)
             | Flatten()
             | CombinePerKey(PostCombineFn()))