You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/03/11 04:48:10 UTC

[beam] branch master updated: [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 51bb63f  [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state
     new 06ce34e  Merge pull request #14182 from [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state
51bb63f is described below

commit 51bb63fc05441c7e9208407ecd5e172c009a269f
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Tue Mar 9 20:32:34 2021 -0800

    [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state
---
 .../beam/fn/harness/WindowMergingFnRunner.java     |  9 ++++++++-
 .../beam/fn/harness/WindowMergingFnRunnerTest.java | 22 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
index edf0e0a..e7b169e 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 
 /**
@@ -154,7 +155,13 @@ public abstract class WindowMergingFnRunner<T, W extends BoundedWindow> {
       for (KV<W, Collection<W>> mergedWindow : mergedWindows) {
         currentWindows.removeAll(mergedWindow.getValue());
       }
-      return KV.of(windowsToMerge.getKey(), KV.of(currentWindows, (Iterable) mergedWindows));
+      KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> result =
+          KV.of(
+              windowsToMerge.getKey(),
+              KV.of(Sets.newHashSet(currentWindows), (Iterable) Lists.newArrayList(mergedWindows)));
+      currentWindows.clear();
+      mergedWindows.clear();
+      return result;
     }
   }
 }
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
index 9816ed6..359ea98 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
@@ -105,6 +105,28 @@ public class WindowMergingFnRunnerTest {
         Iterables.getOnlyElement(output.getValue().getValue());
     assertEquals(new IntervalWindow(new Instant(7L), new Instant(11L)), mergedOutput.getKey());
     assertThat(mergedOutput.getValue(), containsInAnyOrder(expectedToBeMerged));
+
+    // Process a new group of windows, make sure that previous result has been cleaned up.
+    BoundedWindow[] expectedToBeMergedGroup2 =
+        new BoundedWindow[] {
+          new IntervalWindow(new Instant(15L), new Instant(17L)),
+          new IntervalWindow(new Instant(16L), new Instant(18L))
+        };
+
+    input =
+        KV.of(
+            "abc",
+            ImmutableList.<BoundedWindow>builder()
+                .add(expectedToBeMergedGroup2)
+                .addAll(expectedToBeUnmerged)
+                .build());
+
+    output = mapFunction.apply(input);
+    assertEquals(input.getKey(), output.getKey());
+    assertEquals(expectedToBeUnmerged, output.getValue().getKey());
+    mergedOutput = Iterables.getOnlyElement(output.getValue().getValue());
+    assertEquals(new IntervalWindow(new Instant(15L), new Instant(18L)), mergedOutput.getKey());
+    assertThat(mergedOutput.getValue(), containsInAnyOrder(expectedToBeMergedGroup2));
   }
 
   private static <W extends BoundedWindow> RunnerApi.PTransform createMergeTransformForWindowFn(