You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2022/11/28 16:08:07 UTC
[beam] 01/01: Revert "Force discarding mode in with_fanout without rewindowing. (#23828)"
This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch revert-23828-fanout
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 2897e1ac80167b1801a12a1b3dd3ce7a5b805f14
Author: tvalentyn <tv...@users.noreply.github.com>
AuthorDate: Mon Nov 28 08:07:57 2022 -0800
Revert "Force discarding mode in with_fanout without rewindowing. (#23828)"
This reverts commit 42688356bd27cd1db0e1b03212249d7f424d276c.
---
.../apache_beam/transforms/combiners_test.py | 61 ----------------------
sdks/python/apache_beam/transforms/core.py | 14 ++---
2 files changed, 4 insertions(+), 71 deletions(-)
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 2ea0ba862b9..7e0e83542ee 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -563,67 +563,6 @@ class CombineTest(unittest.TestCase):
assert_that(result, has_expected_values)
- def test_combining_with_sliding_windows_and_fanout(self):
- options = PipelineOptions()
- options.view_as(StandardOptions).streaming = True
- with TestPipeline(options=options) as p:
-
- def has_expected_values(actual):
- from hamcrest.core import assert_that as hamcrest_assert
- from hamcrest.library.collection import only_contains
- ordered = sorted(actual)
-
- hamcrest_assert(
- ordered,
- only_contains([0, 1, 2, 3], [0, 1, 2, 3, 5, 6, 7, 8], [5, 6, 7, 8]))
-
- result = (
- p
- | beam.Create([
- window.TimestampedValue(0, Timestamp(seconds=1666707510)),
- window.TimestampedValue(1, Timestamp(seconds=1666707511)),
- window.TimestampedValue(2, Timestamp(seconds=1666707512)),
- window.TimestampedValue(3, Timestamp(seconds=1666707513)),
- window.TimestampedValue(5, Timestamp(seconds=1666707515)),
- window.TimestampedValue(6, Timestamp(seconds=1666707516)),
- window.TimestampedValue(7, Timestamp(seconds=1666707517)),
- window.TimestampedValue(8, Timestamp(seconds=1666707518))
- ])
- | beam.WindowInto(window.SlidingWindows(10, 5))
- | beam.CombineGlobally(beam.combiners.ToListCombineFn()).
- without_defaults().with_fanout(7))
- assert_that(result, has_expected_values)
-
- def test_combining_with_session_windows_and_fanout(self):
- options = PipelineOptions()
- options.view_as(StandardOptions).streaming = True
- with TestPipeline(options=options) as p:
-
- def has_expected_values(actual):
- from hamcrest.core import assert_that as hamcrest_assert
- from hamcrest.library.collection import only_contains
- ordered = sorted(actual)
-
- hamcrest_assert(ordered, only_contains([0, 1, 2, 3], [5, 6, 7, 8]))
-
- result = (
- p
- | beam.Create([
- window.TimestampedValue(0, Timestamp(seconds=1666707510)),
- window.TimestampedValue(1, Timestamp(seconds=1666707511)),
- window.TimestampedValue(2, Timestamp(seconds=1666707512)),
- window.TimestampedValue(3, Timestamp(seconds=1666707513)),
- window.TimestampedValue(5, Timestamp(seconds=1666707515)),
- window.TimestampedValue(6, Timestamp(seconds=1666707516)),
- window.TimestampedValue(7, Timestamp(seconds=1666707517)),
- window.TimestampedValue(8, Timestamp(seconds=1666707518))
- ])
- | beam.WindowInto(window.Sessions(2))
- | beam.CombineGlobally(beam.combiners.ToListCombineFn()).
- without_defaults().with_fanout(7))
-
- assert_that(result, has_expected_values)
-
def test_MeanCombineFn_combine(self):
with TestPipeline() as p:
input = (
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 317844170ca..978e0e1eac3 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2743,8 +2743,6 @@ class _CombinePerKeyWithHotKeyFanout(PTransform):
def expand(self, pcoll):
from apache_beam.transforms.trigger import AccumulationMode
- from apache_beam.transforms.util import _IdentityWindowFn
-
combine_fn = self._combine_fn
fanout_fn = self._fanout_fn
@@ -2804,15 +2802,11 @@ class _CombinePerKeyWithHotKeyFanout(PTransform):
precombined_hot = (
hot
# Avoid double counting that may happen with stacked accumulating mode.
- | 'ForceDiscardingAccumulation' >> WindowInto(
- _IdentityWindowFn(pcoll.windowing.windowfn.get_window_coder()),
- trigger=pcoll.windowing.triggerfn,
- accumulation_mode=AccumulationMode.DISCARDING,
- timestamp_combiner=pcoll.windowing.timestamp_combiner,
- allowed_lateness=pcoll.windowing.allowed_lateness)
+ | 'WindowIntoDiscarding' >> WindowInto(
+ pcoll.windowing, accumulation_mode=AccumulationMode.DISCARDING)
| CombinePerKey(PreCombineFn())
- | Map(StripNonce))
-
+ | Map(StripNonce)
+ | 'WindowIntoOriginal' >> WindowInto(pcoll.windowing))
return ((cold, precombined_hot)
| Flatten()
| CombinePerKey(PostCombineFn()))