You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/30 19:32:43 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

lukecwik commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776842670



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -361,62 +377,332 @@ private CoGbkResult(CoGbkResultSchema schema, List<Iterable<?>> valueMap) {
   }
 
   /**
-   * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an {@code Iterator<V>},
-   * where V is the type of the raw union value's contents.
+   * A re-iterable that notifies an observer at every advance, and upon finishing, but only once
+   * across all copies.
+   *
+   * @param <T> The value type of the underlying iterable.
    */
-  private static class UnionValueIterator<V> implements Iterator<V> {
+  private static class ObservingReiterator<T> implements Reiterator<T> {
+
+    public interface Observer<T> {
+      /**
+       * Called exactly once, across all copies before advancing this iterator.
+       *
+       * <p>The iterator rather than the element is given so that the callee can perform a copy if
+       * desired. This class offers a peek method to get at the current element without disturbing
+       * the state of this iterator.
+       */
+      void observeAt(ObservingReiterator<T> reiterator);
+
+      /** Called exactly once, across all copies, once this iterator is exhausted. */
+      void done();
+    }
 
-    private final int tag;
-    private final PeekingIterator<RawUnionValue> unions;
-    private final Boolean[] containsTag;
+    private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying;
+    private Observer<T> observer;
 
-    private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) {
-      this.tag = tag;
-      this.unions = Iterators.peekingIterator(unions);
-      this.containsTag = containsTag;
+    // Used to keep track of what has been observed so far.
+    // These are arrays to facilitate sharing values among all copies of the same root Reiterator.
+    private final int[] lastObserved;
+    private final boolean[] doneHasRun;
+    private final PeekingReiterator[] mostAdvanced;
+
+    public ObservingReiterator(Reiterator<T> underlying, Observer<T> observer) {
+      this(new PeekingReiterator<>(new IndexingReiterator<>(underlying)), observer);
+    }
+
+    public ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying, Observer<T> observer) {
+      this(
+          underlying,
+          observer,
+          new int[] {-1},
+          new boolean[] {false},
+          new PeekingReiterator[] {underlying});
+    }
+
+    private ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying,
+        Observer<T> observer,
+        int[] lastObserved,
+        boolean[] doneHasRun,
+        PeekingReiterator[] mostAdvanced) {
+      this.underlying = underlying;
+      this.observer = observer;
+      this.lastObserved = lastObserved;
+      this.doneHasRun = doneHasRun;
+      this.mostAdvanced = mostAdvanced;
+    }
+
+    @Override
+    public Reiterator<T> copy() {
+      return new ObservingReiterator<T>(
+          underlying.copy(), observer, lastObserved, doneHasRun, mostAdvanced);
     }
 
     @Override
     public boolean hasNext() {
-      if (Boolean.FALSE.equals(containsTag[tag])) {
-        return false;
+      boolean hasNext = underlying.hasNext();
+      if (!hasNext && !doneHasRun[0]) {
+        mostAdvanced[0] = underlying;
+        observer.done();
+        doneHasRun[0] = true;
       }
-      advance();
-      if (unions.hasNext()) {
-        return true;
-      } else {
-        // Now that we've iterated over all the values, we can resolve all the "unknown" null
-        // values to false.
-        for (int i = 0; i < containsTag.length; i++) {
-          if (containsTag[i] == null) {
-            containsTag[i] = false;
-          }
-        }
-        return false;
+      return hasNext;
+    }
+
+    @Override
+    public T next() {
+      peek(); // trigger observation *before* advancing
+      return underlying.next().value;
+    }
+
+    public T peek() {
+      IndexingReiterator.Indexed<T> next = underlying.peek();
+      if (next.index > lastObserved[0]) {
+        assert next.index == lastObserved[0] + 1;
+        mostAdvanced[0] = underlying;
+        lastObserved[0] = next.index;
+        observer.observeAt(this);
       }
+      return next.value;
+    }
+
+    public void fastForward() {
+      if (underlying != mostAdvanced[0]) {
+        underlying = mostAdvanced[0].copy();
+      }
+    }
+  }
+
+  /**
+   * Assigns a monotonically increasing index to each item in the underling Reiterator.
+   *
+   * @param <T> The value type of the underlying iterable.
+   */
+  private static class IndexingReiterator<T> implements Reiterator<IndexingReiterator.Indexed<T>> {
+
+    private Reiterator<T> underlying;
+    private int index;
+
+    public IndexingReiterator(Reiterator<T> underlying) {
+      this(underlying, 0);
+    }
+
+    public IndexingReiterator(Reiterator<T> underlying, int start) {
+      this.underlying = underlying;
+      this.index = start;
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public V next() {
-      advance();
-      return (V) unions.next().getValue();
+    public IndexingReiterator<T> copy() {
+      return new IndexingReiterator(underlying.copy(), index);
     }
 
-    private void advance() {
-      while (unions.hasNext()) {
-        int curTag = unions.peek().getUnionTag();
-        containsTag[curTag] = true;
-        if (curTag == tag) {
-          break;
-        }
-        unions.next();
+    @Override
+    public boolean hasNext() {
+      return underlying.hasNext();
+    }
+
+    @Override
+    public Indexed<T> next() {
+      return new Indexed<T>(index++, underlying.next());
+    }
+
+    public static class Indexed<T> {
+      public final int index;
+      public final T value;
+
+      public Indexed(int index, T value) {
+        this.index = index;
+        this.value = value;
+      }
+    }
+  }
+
+  /**
+   * Adapts an Reiterator, giving it a peek() method that can be used to observe the next element
+   * without consuming it.
+   *
+   * @param <T> The value type of the underlying iterable.
+   */
+  private static class PeekingReiterator<T> implements Reiterator<T> {
+    private Reiterator<T> underlying;
+    private T next;
+    private boolean nextIsValid;
+
+    public PeekingReiterator(Reiterator<T> underlying) {
+      this(underlying, null, false);
+    }
+
+    private PeekingReiterator(Reiterator<T> underlying, T next, boolean nextIsValid) {
+      this.underlying = underlying;
+      this.next = next;
+      this.nextIsValid = nextIsValid;
+    }
+
+    @Override
+    public PeekingReiterator<T> copy() {
+      return new PeekingReiterator(underlying.copy(), next, nextIsValid);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return nextIsValid || underlying.hasNext();
+    }
+
+    @Override
+    public T next() {
+      if (nextIsValid) {
+        nextIsValid = false;
+        return next;
+      } else {
+        return underlying.next();
+      }
+    }
+
+    public T peek() {
+      if (!nextIsValid) {
+        next = underlying.next();
+        nextIsValid = true;
+      }
+      return next;
+    }
+  }
+
+  /**
+   * An Iterable corresponding to a single tag.
+   *
+   * <p>The values in this iterable are populated lazily via the offer method as tip advances for
+   * any tag.
+   *
+   * @param <T> The value type of the corresponging tag.
+   */
+  private static class TagIterable<T> implements Iterable<T> {
+    int tag;
+    int cacheSize;
+    Supplier<Boolean> forceCache;

Review comment:
       ?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -98,8 +104,7 @@ public CoGbkResult(
         throw new IllegalStateException(
             "union tag " + unionTag + " has no corresponding tuple tag in the result schema");
       }
-      List<Object> valueList = (List<Object>) valueMap.get(unionTag);
-      valueList.add(value.getValue());
+      valuesByTag.get(unionTag).add(value.getValue());
     }
 
     if (taggedIter.hasNext()) {

Review comment:
       nit: use guard style
   
   ```
   if (!taggedIter.hasNext()) {
     valueMap = (List) valuesByTag;
   }
   
   // If we get here, there were more elements than we can afford to
   // keep in memory, so we copy the re-iterable of remaining items
   // and append filtered views to each of the sorted lists computed earlier.
   ...
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -59,6 +60,8 @@
 
   private static final int DEFAULT_IN_MEMORY_ELEMENT_COUNT = 10_000;
 
+  private static final int DEFAULT_MIN_ELEMENTS_PER_TAG = 100;

Review comment:
       Based upon the code it looks like this is used as a max per tag.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org