You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/05/04 20:20:43 UTC
[beam] branch release-2.21.0 updated: [BEAM-9877] Make
BatchGroupAlsoByWindowViaIteratorsFn extend the
ElementByteSizeObservableIterable so that size estimation is lazy
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.21.0 by this push:
new 700da8a [BEAM-9877] Make BatchGroupAlsoByWindowViaIteratorsFn extend the ElementByteSizeObservableIterable so that size estimation is lazy
new 75efec2 Merge pull request #11601 from ibzib/BEAM-9877
700da8a is described below
commit 700da8afb4ee2d93fc6c609f1cb5080fde5defb4
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Mon May 4 15:24:47 2020 -0400
[BEAM-9877] Make BatchGroupAlsoByWindowViaIteratorsFn extend the ElementByteSizeObservableIterable so that size estimation is lazy
---
.../util/BatchGroupAlsoByWindowViaIteratorsFn.java | 17 +++++++++++++----
1 file changed, 13 insertions(+), 4 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java
index 43e49dd..473a181 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java
@@ -33,6 +33,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable;
+import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterator;
import org.apache.beam.sdk.util.common.Reiterable;
import org.apache.beam.sdk.util.common.Reiterator;
import org.apache.beam.sdk.values.KV;
@@ -154,7 +156,8 @@ class BatchGroupAlsoByWindowViaIteratorsFn<K, V, W extends BoundedWindow>
* {@link Reiterable} representing a view of all elements in a base {@link Reiterator} that are in
* a given window.
*/
- private static class WindowReiterable<V> implements Reiterable<V> {
+ private static class WindowReiterable<V>
+ extends ElementByteSizeObservableIterable<V, WindowReiterator<V>> implements Reiterable<V> {
private PeekingReiterator<WindowedValue<V>> baseIterator;
private BoundedWindow window;
@@ -165,12 +168,17 @@ class BatchGroupAlsoByWindowViaIteratorsFn<K, V, W extends BoundedWindow>
}
@Override
- public Reiterator<V> iterator() {
+ public WindowReiterator<V> iterator() {
+ return createIterator();
+ }
+
+ @Override
+ protected WindowReiterator<V> createIterator() {
// We don't copy the baseIterator when creating the first WindowReiterator
// so that the WindowReiterator can advance the baseIterator. We have to
// make a copy afterwards so that future calls to iterator() will start
// at the right spot.
- Reiterator<V> result = new WindowReiterator<V>(baseIterator, window);
+ WindowReiterator<V> result = new WindowReiterator<V>(baseIterator, window);
baseIterator = baseIterator.copy();
return result;
}
@@ -184,7 +192,8 @@ class BatchGroupAlsoByWindowViaIteratorsFn<K, V, W extends BoundedWindow>
/**
* The {@link Reiterator} used by {@link BatchGroupAlsoByWindowViaIteratorsFn.WindowReiterable}.
*/
- private static class WindowReiterator<V> implements Reiterator<V> {
+ private static class WindowReiterator<V> extends ElementByteSizeObservableIterator<V>
+ implements Reiterator<V> {
private PeekingReiterator<WindowedValue<V>> iterator;
private BoundedWindow window;