You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/03/11 04:48:10 UTC
[beam] branch master updated: [BEAM-11952] Clean up merged window
result in MergingViaWindowFnRunner to avoid accumulating unnecessary state
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 51bb63f [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state
new 06ce34e Merge pull request #14182 from [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state
51bb63f is described below
commit 51bb63fc05441c7e9208407ecd5e172c009a269f
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Tue Mar 9 20:32:34 2021 -0800
[BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state
---
.../beam/fn/harness/WindowMergingFnRunner.java | 9 ++++++++-
.../beam/fn/harness/WindowMergingFnRunnerTest.java | 22 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
index edf0e0a..e7b169e 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
/**
@@ -154,7 +155,13 @@ public abstract class WindowMergingFnRunner<T, W extends BoundedWindow> {
for (KV<W, Collection<W>> mergedWindow : mergedWindows) {
currentWindows.removeAll(mergedWindow.getValue());
}
- return KV.of(windowsToMerge.getKey(), KV.of(currentWindows, (Iterable) mergedWindows));
+ KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> result =
+ KV.of(
+ windowsToMerge.getKey(),
+ KV.of(Sets.newHashSet(currentWindows), (Iterable) Lists.newArrayList(mergedWindows)));
+ currentWindows.clear();
+ mergedWindows.clear();
+ return result;
}
}
}
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
index 9816ed6..359ea98 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
@@ -105,6 +105,28 @@ public class WindowMergingFnRunnerTest {
Iterables.getOnlyElement(output.getValue().getValue());
assertEquals(new IntervalWindow(new Instant(7L), new Instant(11L)), mergedOutput.getKey());
assertThat(mergedOutput.getValue(), containsInAnyOrder(expectedToBeMerged));
+
+ // Process a new group of windows, make sure that previous result has been cleaned up.
+ BoundedWindow[] expectedToBeMergedGroup2 =
+ new BoundedWindow[] {
+ new IntervalWindow(new Instant(15L), new Instant(17L)),
+ new IntervalWindow(new Instant(16L), new Instant(18L))
+ };
+
+ input =
+ KV.of(
+ "abc",
+ ImmutableList.<BoundedWindow>builder()
+ .add(expectedToBeMergedGroup2)
+ .addAll(expectedToBeUnmerged)
+ .build());
+
+ output = mapFunction.apply(input);
+ assertEquals(input.getKey(), output.getKey());
+ assertEquals(expectedToBeUnmerged, output.getValue().getKey());
+ mergedOutput = Iterables.getOnlyElement(output.getValue().getValue());
+ assertEquals(new IntervalWindow(new Instant(15L), new Instant(18L)), mergedOutput.getKey());
+ assertThat(mergedOutput.getValue(), containsInAnyOrder(expectedToBeMergedGroup2));
}
private static <W extends BoundedWindow> RunnerApi.PTransform createMergeTransformForWindowFn(