You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/28 23:03:13 UTC

[GitHub] [beam] aaltay commented on a change in pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

aaltay commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776094845



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -361,62 +377,331 @@ private CoGbkResult(CoGbkResultSchema schema, List<Iterable<?>> valueMap) {
   }
 
   /**
-   * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an {@code Iterator<V>},
-   * where V is the type of the raw union value's contents.
+   * A re-iterable that notifies an observer at every advance, and upon finishing, but only once
+   * across all copies.
+   *
+   * @param <T> The value type of the underlying iterable.
    */
-  private static class UnionValueIterator<V> implements Iterator<V> {
+  private static class ObservingReiterator<T> implements Reiterator<T> {
+
+    public interface Observer<T> {
+      /**
+       * Called exactly once, across all copies before advancing this iterator.
+       *
+       * <p>The iterator rather than the element is given so that the callee can perform a copy if
+       * desired. This class offers a peek method to get at the current element without disturbing
+       * the state of this iterator.
+       */
+      void observeAt(ObservingReiterator<T> reiterator);
+
+      /** Called exactly once, across all copies, once this iterator is exhausted. */
+      void done();
+    }
 
-    private final int tag;
-    private final PeekingIterator<RawUnionValue> unions;
-    private final Boolean[] containsTag;
+    private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying;
+    private Observer<T> observer;
 
-    private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) {
-      this.tag = tag;
-      this.unions = Iterators.peekingIterator(unions);
-      this.containsTag = containsTag;
+    // Used to keep track of what has been observed so far.
+    private final int[] lastObserved;
+    private final boolean[] doneHasRun;
+    private final PeekingReiterator[] mostAdvanced;
+
+    public ObservingReiterator(Reiterator<T> underlying, Observer<T> observer) {
+      this(new PeekingReiterator<>(new IndexingReiterator<>(underlying)), observer);
+    }
+
+    public ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying, Observer<T> observer) {
+      this(
+          underlying,
+          observer,
+          new int[] {-1},
+          new boolean[] {false},
+          new PeekingReiterator[] {underlying});
+    }
+
+    private ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying,
+        Observer<T> observer,
+        int[] lastObserved,
+        boolean[] doneHasRun,
+        PeekingReiterator[] mostAdvanced) {
+      this.underlying = underlying;
+      this.observer = observer;
+      this.lastObserved = lastObserved;
+      this.doneHasRun = doneHasRun;
+      this.mostAdvanced = mostAdvanced;
+    }
+
+    @Override
+    public Reiterator<T> copy() {
+      return new ObservingReiterator<T>(
+          underlying.copy(), observer, lastObserved, doneHasRun, mostAdvanced);
     }
 
     @Override
     public boolean hasNext() {
-      if (Boolean.FALSE.equals(containsTag[tag])) {
-        return false;
+      boolean hasNext = underlying.hasNext();
+      if (!hasNext && !doneHasRun[0]) {
+        mostAdvanced[0] = underlying;
+        observer.done();
+        doneHasRun[0] = true;
       }
-      advance();
-      if (unions.hasNext()) {
-        return true;
-      } else {
-        // Now that we've iterated over all the values, we can resolve all the "unknown" null
-        // values to false.
-        for (int i = 0; i < containsTag.length; i++) {
-          if (containsTag[i] == null) {
-            containsTag[i] = false;
-          }
-        }
-        return false;
+      return hasNext;
+    }
+
+    @Override
+    public T next() {
+      peek(); // trigger observation *before* advancing
+      return underlying.next().value;
+    }
+
+    public T peek() {
+      IndexingReiterator.Indexed<T> next = underlying.peek();
+      if (next.index > lastObserved[0]) {
+        assert next.index == lastObserved[0] + 1;
+        mostAdvanced[0] = underlying;
+        lastObserved[0] = next.index;
+        observer.observeAt(this);
       }
+      return next.value;
+    }
+
+    public void fastForward() {
+      if (underlying != mostAdvanced[0]) {
+        underlying = mostAdvanced[0].copy();
+      }
+    }
+  }
+
+  /**
+   * Assigns a monotonically increasing index to each item in teh underling Reiterator.

Review comment:
       nit: s/teh/the

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -361,62 +377,331 @@ private CoGbkResult(CoGbkResultSchema schema, List<Iterable<?>> valueMap) {
   }
 
   /**
-   * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an {@code Iterator<V>},
-   * where V is the type of the raw union value's contents.
+   * A re-iterable that notifies an observer at every advance, and upon finishing, but only once
+   * across all copies.
+   *
+   * @param <T> The value type of the underlying iterable.
    */
-  private static class UnionValueIterator<V> implements Iterator<V> {
+  private static class ObservingReiterator<T> implements Reiterator<T> {
+
+    public interface Observer<T> {
+      /**
+       * Called exactly once, across all copies before advancing this iterator.
+       *
+       * <p>The iterator rather than the element is given so that the callee can perform a copy if
+       * desired. This class offers a peek method to get at the current element without disturbing
+       * the state of this iterator.
+       */
+      void observeAt(ObservingReiterator<T> reiterator);
+
+      /** Called exactly once, across all copies, once this iterator is exhausted. */
+      void done();
+    }
 
-    private final int tag;
-    private final PeekingIterator<RawUnionValue> unions;
-    private final Boolean[] containsTag;
+    private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying;
+    private Observer<T> observer;
 
-    private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) {
-      this.tag = tag;
-      this.unions = Iterators.peekingIterator(unions);
-      this.containsTag = containsTag;
+    // Used to keep track of what has been observed so far.
+    private final int[] lastObserved;

Review comment:
       Why are these 3 variables are arrays? Only the index 0 is used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org