You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/05/04 20:20:43 UTC

[beam] branch release-2.21.0 updated: [BEAM-9877] Make BatchGroupAlsoByWindowViaIteratorsFn extend the ElementByteSizeObservableIterable so that size estimation is lazy

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.21.0 by this push:
     new 700da8a  [BEAM-9877] Make BatchGroupAlsoByWindowViaIteratorsFn extend the ElementByteSizeObservableIterable so that size estimation is lazy
     new 75efec2  Merge pull request #11601 from ibzib/BEAM-9877
700da8a is described below

commit 700da8afb4ee2d93fc6c609f1cb5080fde5defb4
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Mon May 4 15:24:47 2020 -0400

    [BEAM-9877] Make BatchGroupAlsoByWindowViaIteratorsFn extend the ElementByteSizeObservableIterable so that size estimation is lazy
---
 .../util/BatchGroupAlsoByWindowViaIteratorsFn.java      | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java
index 43e49dd..473a181 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java
@@ -33,6 +33,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable;
+import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterator;
 import org.apache.beam.sdk.util.common.Reiterable;
 import org.apache.beam.sdk.util.common.Reiterator;
 import org.apache.beam.sdk.values.KV;
@@ -154,7 +156,8 @@ class BatchGroupAlsoByWindowViaIteratorsFn<K, V, W extends BoundedWindow>
    * {@link Reiterable} representing a view of all elements in a base {@link Reiterator} that are in
    * a given window.
    */
-  private static class WindowReiterable<V> implements Reiterable<V> {
+  private static class WindowReiterable<V>
+      extends ElementByteSizeObservableIterable<V, WindowReiterator<V>> implements Reiterable<V> {
     private PeekingReiterator<WindowedValue<V>> baseIterator;
     private BoundedWindow window;
 
@@ -165,12 +168,17 @@ class BatchGroupAlsoByWindowViaIteratorsFn<K, V, W extends BoundedWindow>
     }
 
     @Override
-    public Reiterator<V> iterator() {
+    public WindowReiterator<V> iterator() {
+      return createIterator();
+    }
+
+    @Override
+    protected WindowReiterator<V> createIterator() {
       // We don't copy the baseIterator when creating the first WindowReiterator
       // so that the WindowReiterator can advance the baseIterator.  We have to
       // make a copy afterwards so that future calls to iterator() will start
       // at the right spot.
-      Reiterator<V> result = new WindowReiterator<V>(baseIterator, window);
+      WindowReiterator<V> result = new WindowReiterator<V>(baseIterator, window);
       baseIterator = baseIterator.copy();
       return result;
     }
@@ -184,7 +192,8 @@ class BatchGroupAlsoByWindowViaIteratorsFn<K, V, W extends BoundedWindow>
   /**
    * The {@link Reiterator} used by {@link BatchGroupAlsoByWindowViaIteratorsFn.WindowReiterable}.
    */
-  private static class WindowReiterator<V> implements Reiterator<V> {
+  private static class WindowReiterator<V> extends ElementByteSizeObservableIterator<V>
+      implements Reiterator<V> {
     private PeekingReiterator<WindowedValue<V>> iterator;
     private BoundedWindow window;