You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/24 10:06:32 UTC

[GitHub] [beam] robertwb opened a new pull request #16354: [BEAM-13541] More intellegent caching of CoGBK values.

robertwb opened a new pull request #16354:
URL: https://github.com/apache/beam/pull/16354


   A minimal number of elements are cached for each tag, possibly in addition to a global number of elements cached. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16354:
URL: https://github.com/apache/beam/pull/16354#issuecomment-1003208581


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776847987



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -361,62 +377,332 @@ private CoGbkResult(CoGbkResultSchema schema, List<Iterable<?>> valueMap) {
   }
 
   /**
-   * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an {@code Iterator<V>},
-   * where V is the type of the raw union value's contents.
+   * A re-iterable that notifies an observer at every advance, and upon finishing, but only once
+   * across all copies.
+   *
+   * @param <T> The value type of the underlying iterable.
    */
-  private static class UnionValueIterator<V> implements Iterator<V> {
+  private static class ObservingReiterator<T> implements Reiterator<T> {
+
+    public interface Observer<T> {
+      /**
+       * Called exactly once, across all copies before advancing this iterator.
+       *
+       * <p>The iterator rather than the element is given so that the callee can perform a copy if
+       * desired. This class offers a peek method to get at the current element without disturbing
+       * the state of this iterator.
+       */
+      void observeAt(ObservingReiterator<T> reiterator);
+
+      /** Called exactly once, across all copies, once this iterator is exhausted. */
+      void done();
+    }
 
-    private final int tag;
-    private final PeekingIterator<RawUnionValue> unions;
-    private final Boolean[] containsTag;
+    private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying;
+    private Observer<T> observer;
 
-    private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) {
-      this.tag = tag;
-      this.unions = Iterators.peekingIterator(unions);
-      this.containsTag = containsTag;
+    // Used to keep track of what has been observed so far.
+    // These are arrays to facilitate sharing values among all copies of the same root Reiterator.
+    private final int[] lastObserved;
+    private final boolean[] doneHasRun;
+    private final PeekingReiterator[] mostAdvanced;
+
+    public ObservingReiterator(Reiterator<T> underlying, Observer<T> observer) {
+      this(new PeekingReiterator<>(new IndexingReiterator<>(underlying)), observer);
+    }
+
+    public ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying, Observer<T> observer) {
+      this(
+          underlying,
+          observer,
+          new int[] {-1},
+          new boolean[] {false},
+          new PeekingReiterator[] {underlying});
+    }
+
+    private ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying,
+        Observer<T> observer,
+        int[] lastObserved,
+        boolean[] doneHasRun,
+        PeekingReiterator[] mostAdvanced) {
+      this.underlying = underlying;
+      this.observer = observer;
+      this.lastObserved = lastObserved;
+      this.doneHasRun = doneHasRun;
+      this.mostAdvanced = mostAdvanced;
+    }
+
+    @Override
+    public Reiterator<T> copy() {
+      return new ObservingReiterator<T>(
+          underlying.copy(), observer, lastObserved, doneHasRun, mostAdvanced);
     }
 
     @Override
     public boolean hasNext() {
-      if (Boolean.FALSE.equals(containsTag[tag])) {
-        return false;
+      boolean hasNext = underlying.hasNext();
+      if (!hasNext && !doneHasRun[0]) {
+        mostAdvanced[0] = underlying;
+        observer.done();
+        doneHasRun[0] = true;
       }
-      advance();
-      if (unions.hasNext()) {
-        return true;
-      } else {
-        // Now that we've iterated over all the values, we can resolve all the "unknown" null
-        // values to false.
-        for (int i = 0; i < containsTag.length; i++) {
-          if (containsTag[i] == null) {
-            containsTag[i] = false;
-          }
-        }
-        return false;
+      return hasNext;
+    }
+
+    @Override
+    public T next() {
+      peek(); // trigger observation *before* advancing
+      return underlying.next().value;
+    }
+
+    public T peek() {
+      IndexingReiterator.Indexed<T> next = underlying.peek();
+      if (next.index > lastObserved[0]) {
+        assert next.index == lastObserved[0] + 1;
+        mostAdvanced[0] = underlying;
+        lastObserved[0] = next.index;
+        observer.observeAt(this);
       }
+      return next.value;
+    }
+
+    public void fastForward() {
+      if (underlying != mostAdvanced[0]) {
+        underlying = mostAdvanced[0].copy();
+      }
+    }
+  }
+
+  /**
+   * Assigns a monotonically increasing index to each item in the underling Reiterator.
+   *
+   * @param <T> The value type of the underlying iterable.
+   */
+  private static class IndexingReiterator<T> implements Reiterator<IndexingReiterator.Indexed<T>> {
+
+    private Reiterator<T> underlying;
+    private int index;
+
+    public IndexingReiterator(Reiterator<T> underlying) {
+      this(underlying, 0);
+    }
+
+    public IndexingReiterator(Reiterator<T> underlying, int start) {
+      this.underlying = underlying;
+      this.index = start;
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public V next() {
-      advance();
-      return (V) unions.next().getValue();
+    public IndexingReiterator<T> copy() {
+      return new IndexingReiterator(underlying.copy(), index);
     }
 
-    private void advance() {
-      while (unions.hasNext()) {
-        int curTag = unions.peek().getUnionTag();
-        containsTag[curTag] = true;
-        if (curTag == tag) {
-          break;
-        }
-        unions.next();
+    @Override
+    public boolean hasNext() {
+      return underlying.hasNext();
+    }
+
+    @Override
+    public Indexed<T> next() {
+      return new Indexed<T>(index++, underlying.next());
+    }
+
+    public static class Indexed<T> {
+      public final int index;
+      public final T value;
+
+      public Indexed(int index, T value) {
+        this.index = index;
+        this.value = value;
+      }
+    }
+  }
+
+  /**
+   * Adapts an Reiterator, giving it a peek() method that can be used to observe the next element
+   * without consuming it.
+   *
+   * @param <T> The value type of the underlying iterable.
+   */
+  private static class PeekingReiterator<T> implements Reiterator<T> {
+    private Reiterator<T> underlying;
+    private T next;
+    private boolean nextIsValid;
+
+    public PeekingReiterator(Reiterator<T> underlying) {
+      this(underlying, null, false);
+    }
+
+    private PeekingReiterator(Reiterator<T> underlying, T next, boolean nextIsValid) {
+      this.underlying = underlying;
+      this.next = next;
+      this.nextIsValid = nextIsValid;
+    }
+
+    @Override
+    public PeekingReiterator<T> copy() {
+      return new PeekingReiterator(underlying.copy(), next, nextIsValid);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return nextIsValid || underlying.hasNext();
+    }
+
+    @Override
+    public T next() {
+      if (nextIsValid) {
+        nextIsValid = false;
+        return next;
+      } else {
+        return underlying.next();
+      }
+    }
+
+    public T peek() {
+      if (!nextIsValid) {
+        next = underlying.next();
+        nextIsValid = true;
+      }
+      return next;
+    }
+  }
+
+  /**
+   * An Iterable corresponding to a single tag.
+   *
+   * <p>The values in this iterable are populated lazily via the offer method as tip advances for
+   * any tag.
+   *
+   * @param <T> The value type of the corresponging tag.
+   */
+  private static class TagIterable<T> implements Iterable<T> {
+    int tag;
+    int cacheSize;
+    Supplier<Boolean> forceCache;

Review comment:
       Leftover from a refactor, removing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776154114



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -361,62 +377,331 @@ private CoGbkResult(CoGbkResultSchema schema, List<Iterable<?>> valueMap) {
   }
 
   /**
-   * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an {@code Iterator<V>},
-   * where V is the type of the raw union value's contents.
+   * A re-iterable that notifies an observer at every advance, and upon finishing, but only once
+   * across all copies.
+   *
+   * @param <T> The value type of the underlying iterable.
    */
-  private static class UnionValueIterator<V> implements Iterator<V> {
+  private static class ObservingReiterator<T> implements Reiterator<T> {
+
+    public interface Observer<T> {
+      /**
+       * Called exactly once, across all copies before advancing this iterator.
+       *
+       * <p>The iterator rather than the element is given so that the callee can perform a copy if
+       * desired. This class offers a peek method to get at the current element without disturbing
+       * the state of this iterator.
+       */
+      void observeAt(ObservingReiterator<T> reiterator);
+
+      /** Called exactly once, across all copies, once this iterator is exhausted. */
+      void done();
+    }
 
-    private final int tag;
-    private final PeekingIterator<RawUnionValue> unions;
-    private final Boolean[] containsTag;
+    private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying;
+    private Observer<T> observer;
 
-    private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) {
-      this.tag = tag;
-      this.unions = Iterators.peekingIterator(unions);
-      this.containsTag = containsTag;
+    // Used to keep track of what has been observed so far.
+    private final int[] lastObserved;

Review comment:
       They are zero-length arrays because we want to share these values among all copies of the reiterator. Basically they're like pointers. Added a comment to clarify. 

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -361,62 +377,331 @@ private CoGbkResult(CoGbkResultSchema schema, List<Iterable<?>> valueMap) {
   }
 
   /**
-   * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an {@code Iterator<V>},
-   * where V is the type of the raw union value's contents.
+   * A re-iterable that notifies an observer at every advance, and upon finishing, but only once
+   * across all copies.
+   *
+   * @param <T> The value type of the underlying iterable.
    */
-  private static class UnionValueIterator<V> implements Iterator<V> {
+  private static class ObservingReiterator<T> implements Reiterator<T> {
+
+    public interface Observer<T> {
+      /**
+       * Called exactly once, across all copies before advancing this iterator.
+       *
+       * <p>The iterator rather than the element is given so that the callee can perform a copy if
+       * desired. This class offers a peek method to get at the current element without disturbing
+       * the state of this iterator.
+       */
+      void observeAt(ObservingReiterator<T> reiterator);
+
+      /** Called exactly once, across all copies, once this iterator is exhausted. */
+      void done();
+    }
 
-    private final int tag;
-    private final PeekingIterator<RawUnionValue> unions;
-    private final Boolean[] containsTag;
+    private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying;
+    private Observer<T> observer;
 
-    private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) {
-      this.tag = tag;
-      this.unions = Iterators.peekingIterator(unions);
-      this.containsTag = containsTag;
+    // Used to keep track of what has been observed so far.
+    private final int[] lastObserved;
+    private final boolean[] doneHasRun;
+    private final PeekingReiterator[] mostAdvanced;
+
+    public ObservingReiterator(Reiterator<T> underlying, Observer<T> observer) {
+      this(new PeekingReiterator<>(new IndexingReiterator<>(underlying)), observer);
+    }
+
+    public ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying, Observer<T> observer) {
+      this(
+          underlying,
+          observer,
+          new int[] {-1},
+          new boolean[] {false},
+          new PeekingReiterator[] {underlying});
+    }
+
+    private ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying,
+        Observer<T> observer,
+        int[] lastObserved,
+        boolean[] doneHasRun,
+        PeekingReiterator[] mostAdvanced) {
+      this.underlying = underlying;
+      this.observer = observer;
+      this.lastObserved = lastObserved;
+      this.doneHasRun = doneHasRun;
+      this.mostAdvanced = mostAdvanced;
+    }
+
+    @Override
+    public Reiterator<T> copy() {
+      return new ObservingReiterator<T>(
+          underlying.copy(), observer, lastObserved, doneHasRun, mostAdvanced);
     }
 
     @Override
     public boolean hasNext() {
-      if (Boolean.FALSE.equals(containsTag[tag])) {
-        return false;
+      boolean hasNext = underlying.hasNext();
+      if (!hasNext && !doneHasRun[0]) {
+        mostAdvanced[0] = underlying;
+        observer.done();
+        doneHasRun[0] = true;
       }
-      advance();
-      if (unions.hasNext()) {
-        return true;
-      } else {
-        // Now that we've iterated over all the values, we can resolve all the "unknown" null
-        // values to false.
-        for (int i = 0; i < containsTag.length; i++) {
-          if (containsTag[i] == null) {
-            containsTag[i] = false;
-          }
-        }
-        return false;
+      return hasNext;
+    }
+
+    @Override
+    public T next() {
+      peek(); // trigger observation *before* advancing
+      return underlying.next().value;
+    }
+
+    public T peek() {
+      IndexingReiterator.Indexed<T> next = underlying.peek();
+      if (next.index > lastObserved[0]) {
+        assert next.index == lastObserved[0] + 1;
+        mostAdvanced[0] = underlying;
+        lastObserved[0] = next.index;
+        observer.observeAt(this);
       }
+      return next.value;
+    }
+
+    public void fastForward() {
+      if (underlying != mostAdvanced[0]) {
+        underlying = mostAdvanced[0].copy();
+      }
+    }
+  }
+
+  /**
+   * Assigns a monotonically increasing index to each item in teh underling Reiterator.

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #16354:
URL: https://github.com/apache/beam/pull/16354#issuecomment-1002189290


   FYI, reference to the previously failing precommit : https://ci-beam.apache.org/job/beam_PreCommit_Java_Phrase/4426/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16354:
URL: https://github.com/apache/beam/pull/16354#issuecomment-1003445197


   Spot bugs consistently fails with:
   ```
   DLS 	Dead store to tail in new org.apache.beam.sdk.transforms.join.CoGbkResult(CoGbkResultSchema, Iterable, int, int)
   ```
   
   Fixed in #16407 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776845531



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultTest.java
##########
@@ -57,6 +74,175 @@ public void runLazyResult(int cacheSize) {
     assertThat(result.getAll(new TupleTag<>("tag0")), contains(0, 2, 4));
   }
 
+  @Test
+  public void testLazyResults() {
+    TestUnionValues values = new TestUnionValues(0, 0, 1, 1, 0, 1, 1);
+    CoGbkResult result = new CoGbkResult(createSchema(5), values, 0, 2);
+    // Nothing is read until we try to iterate.
+    assertThat(values.maxPos(), equalTo(0));
+    Iterable<?> tag0iterable = result.getAll("tag0");
+    assertThat(values.maxPos(), equalTo(0));
+    tag0iterable.iterator();
+    assertThat(values.maxPos(), equalTo(0));
+
+    // Iterating reads (nearly) the minimal number of values.
+    Iterator<?> tag0 = tag0iterable.iterator();
+    tag0.next();
+    assertThat(values.maxPos(), lessThanOrEqualTo(2));
+    tag0.next();
+    assertThat(values.maxPos(), equalTo(2));
+    // Note that we're skipping over tag 1.
+    tag0.next();
+    assertThat(values.maxPos(), equalTo(5));
+
+    // Iterating again does not cause more reads.
+    Iterator<?> tag0iterAgain = tag0iterable.iterator();
+    tag0iterAgain.next();
+    tag0iterAgain.next();
+    tag0iterAgain.next();
+    assertThat(values.maxPos(), equalTo(5));
+
+    // Iterating over other tags does not cause more reads for values we have seen.
+    Iterator<?> tag1 = result.getAll("tag1").iterator();
+    tag1.next();
+    tag1.next();
+    assertThat(values.maxPos(), equalTo(5));
+    // However, finding the next tag1 value does require more reads.
+    tag1.next();
+    assertThat(values.maxPos(), equalTo(6));
+  }
+
+  @Test
+  @SuppressWarnings("BoxedPrimitiveEquality")
+  public void testCachedResults() {
+    // Ensure we don't fail below due to a non-default java.lang.Integer.IntegerCache.high setting.

Review comment:
       ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776851490



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -98,8 +104,7 @@ public CoGbkResult(
         throw new IllegalStateException(
             "union tag " + unionTag + " has no corresponding tuple tag in the result schema");
       }
-      List<Object> valueList = (List<Object>) valueMap.get(unionTag);
-      valueList.add(value.getValue());
+      valuesByTag.get(unionTag).add(value.getValue());
     }
 
     if (taggedIter.hasNext()) {

Review comment:
       There are pros and cons to this, but I agree it's a slight improvement. Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #16354:
URL: https://github.com/apache/beam/pull/16354#issuecomment-1003251305






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on a change in pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776094845



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -361,62 +377,331 @@ private CoGbkResult(CoGbkResultSchema schema, List<Iterable<?>> valueMap) {
   }
 
   /**
-   * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an {@code Iterator<V>},
-   * where V is the type of the raw union value's contents.
+   * A re-iterable that notifies an observer at every advance, and upon finishing, but only once
+   * across all copies.
+   *
+   * @param <T> The value type of the underlying iterable.
    */
-  private static class UnionValueIterator<V> implements Iterator<V> {
+  private static class ObservingReiterator<T> implements Reiterator<T> {
+
+    public interface Observer<T> {
+      /**
+       * Called exactly once, across all copies before advancing this iterator.
+       *
+       * <p>The iterator rather than the element is given so that the callee can perform a copy if
+       * desired. This class offers a peek method to get at the current element without disturbing
+       * the state of this iterator.
+       */
+      void observeAt(ObservingReiterator<T> reiterator);
+
+      /** Called exactly once, across all copies, once this iterator is exhausted. */
+      void done();
+    }
 
-    private final int tag;
-    private final PeekingIterator<RawUnionValue> unions;
-    private final Boolean[] containsTag;
+    private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying;
+    private Observer<T> observer;
 
-    private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) {
-      this.tag = tag;
-      this.unions = Iterators.peekingIterator(unions);
-      this.containsTag = containsTag;
+    // Used to keep track of what has been observed so far.
+    private final int[] lastObserved;
+    private final boolean[] doneHasRun;
+    private final PeekingReiterator[] mostAdvanced;
+
+    public ObservingReiterator(Reiterator<T> underlying, Observer<T> observer) {
+      this(new PeekingReiterator<>(new IndexingReiterator<>(underlying)), observer);
+    }
+
+    public ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying, Observer<T> observer) {
+      this(
+          underlying,
+          observer,
+          new int[] {-1},
+          new boolean[] {false},
+          new PeekingReiterator[] {underlying});
+    }
+
+    private ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying,
+        Observer<T> observer,
+        int[] lastObserved,
+        boolean[] doneHasRun,
+        PeekingReiterator[] mostAdvanced) {
+      this.underlying = underlying;
+      this.observer = observer;
+      this.lastObserved = lastObserved;
+      this.doneHasRun = doneHasRun;
+      this.mostAdvanced = mostAdvanced;
+    }
+
+    @Override
+    public Reiterator<T> copy() {
+      return new ObservingReiterator<T>(
+          underlying.copy(), observer, lastObserved, doneHasRun, mostAdvanced);
     }
 
     @Override
     public boolean hasNext() {
-      if (Boolean.FALSE.equals(containsTag[tag])) {
-        return false;
+      boolean hasNext = underlying.hasNext();
+      if (!hasNext && !doneHasRun[0]) {
+        mostAdvanced[0] = underlying;
+        observer.done();
+        doneHasRun[0] = true;
       }
-      advance();
-      if (unions.hasNext()) {
-        return true;
-      } else {
-        // Now that we've iterated over all the values, we can resolve all the "unknown" null
-        // values to false.
-        for (int i = 0; i < containsTag.length; i++) {
-          if (containsTag[i] == null) {
-            containsTag[i] = false;
-          }
-        }
-        return false;
+      return hasNext;
+    }
+
+    @Override
+    public T next() {
+      peek(); // trigger observation *before* advancing
+      return underlying.next().value;
+    }
+
+    public T peek() {
+      IndexingReiterator.Indexed<T> next = underlying.peek();
+      if (next.index > lastObserved[0]) {
+        assert next.index == lastObserved[0] + 1;
+        mostAdvanced[0] = underlying;
+        lastObserved[0] = next.index;
+        observer.observeAt(this);
       }
+      return next.value;
+    }
+
+    public void fastForward() {
+      if (underlying != mostAdvanced[0]) {
+        underlying = mostAdvanced[0].copy();
+      }
+    }
+  }
+
+  /**
+   * Assigns a monotonically increasing index to each item in teh underling Reiterator.

Review comment:
       nit: s/teh/the

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -361,62 +377,331 @@ private CoGbkResult(CoGbkResultSchema schema, List<Iterable<?>> valueMap) {
   }
 
   /**
-   * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an {@code Iterator<V>},
-   * where V is the type of the raw union value's contents.
+   * A re-iterable that notifies an observer at every advance, and upon finishing, but only once
+   * across all copies.
+   *
+   * @param <T> The value type of the underlying iterable.
    */
-  private static class UnionValueIterator<V> implements Iterator<V> {
+  private static class ObservingReiterator<T> implements Reiterator<T> {
+
+    public interface Observer<T> {
+      /**
+       * Called exactly once, across all copies before advancing this iterator.
+       *
+       * <p>The iterator rather than the element is given so that the callee can perform a copy if
+       * desired. This class offers a peek method to get at the current element without disturbing
+       * the state of this iterator.
+       */
+      void observeAt(ObservingReiterator<T> reiterator);
+
+      /** Called exactly once, across all copies, once this iterator is exhausted. */
+      void done();
+    }
 
-    private final int tag;
-    private final PeekingIterator<RawUnionValue> unions;
-    private final Boolean[] containsTag;
+    private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying;
+    private Observer<T> observer;
 
-    private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) {
-      this.tag = tag;
-      this.unions = Iterators.peekingIterator(unions);
-      this.containsTag = containsTag;
+    // Used to keep track of what has been observed so far.
+    private final int[] lastObserved;

Review comment:
       Why are these 3 variables are arrays? Only the index 0 is used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] emilymye commented on a change in pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
emilymye commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776817563



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -361,62 +377,332 @@ private CoGbkResult(CoGbkResultSchema schema, List<Iterable<?>> valueMap) {
   }
 
   /**
-   * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an {@code Iterator<V>},
-   * where V is the type of the raw union value's contents.
+   * A re-iterable that notifies an observer at every advance, and upon finishing, but only once
+   * across all copies.
+   *
+   * @param <T> The value type of the underlying iterable.
    */
-  private static class UnionValueIterator<V> implements Iterator<V> {
+  private static class ObservingReiterator<T> implements Reiterator<T> {
+
+    public interface Observer<T> {
+      /**
+       * Called exactly once, across all copies before advancing this iterator.
+       *
+       * <p>The iterator rather than the element is given so that the callee can perform a copy if
+       * desired. This class offers a peek method to get at the current element without disturbing
+       * the state of this iterator.
+       */
+      void observeAt(ObservingReiterator<T> reiterator);
+
+      /** Called exactly once, across all copies, once this iterator is exhausted. */
+      void done();
+    }
 
-    private final int tag;
-    private final PeekingIterator<RawUnionValue> unions;
-    private final Boolean[] containsTag;
+    private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying;
+    private Observer<T> observer;
 
-    private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) {
-      this.tag = tag;
-      this.unions = Iterators.peekingIterator(unions);
-      this.containsTag = containsTag;
+    // Used to keep track of what has been observed so far.
+    // These are arrays to facilitate sharing values among all copies of the same root Reiterator.
+    private final int[] lastObserved;
+    private final boolean[] doneHasRun;
+    private final PeekingReiterator[] mostAdvanced;
+
+    public ObservingReiterator(Reiterator<T> underlying, Observer<T> observer) {
+      this(new PeekingReiterator<>(new IndexingReiterator<>(underlying)), observer);
+    }
+
+    public ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying, Observer<T> observer) {
+      this(
+          underlying,
+          observer,
+          new int[] {-1},
+          new boolean[] {false},
+          new PeekingReiterator[] {underlying});
+    }
+
+    private ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying,
+        Observer<T> observer,
+        int[] lastObserved,
+        boolean[] doneHasRun,
+        PeekingReiterator[] mostAdvanced) {
+      this.underlying = underlying;
+      this.observer = observer;
+      this.lastObserved = lastObserved;
+      this.doneHasRun = doneHasRun;
+      this.mostAdvanced = mostAdvanced;
+    }
+
+    @Override
+    public Reiterator<T> copy() {
+      return new ObservingReiterator<T>(
+          underlying.copy(), observer, lastObserved, doneHasRun, mostAdvanced);
     }
 
     @Override
     public boolean hasNext() {
-      if (Boolean.FALSE.equals(containsTag[tag])) {
-        return false;
+      boolean hasNext = underlying.hasNext();
+      if (!hasNext && !doneHasRun[0]) {
+        mostAdvanced[0] = underlying;
+        observer.done();
+        doneHasRun[0] = true;
       }
-      advance();
-      if (unions.hasNext()) {
-        return true;
-      } else {
-        // Now that we've iterated over all the values, we can resolve all the "unknown" null
-        // values to false.
-        for (int i = 0; i < containsTag.length; i++) {
-          if (containsTag[i] == null) {
-            containsTag[i] = false;
-          }
-        }
-        return false;
+      return hasNext;
+    }
+
+    @Override
+    public T next() {
+      peek(); // trigger observation *before* advancing
+      return underlying.next().value;
+    }
+
+    public T peek() {
+      IndexingReiterator.Indexed<T> next = underlying.peek();
+      if (next.index > lastObserved[0]) {
+        assert next.index == lastObserved[0] + 1;
+        mostAdvanced[0] = underlying;
+        lastObserved[0] = next.index;
+        observer.observeAt(this);
       }
+      return next.value;
+    }
+
+    public void fastForward() {
+      if (underlying != mostAdvanced[0]) {
+        underlying = mostAdvanced[0].copy();
+      }
+    }
+  }
+
+  /**
+   * Assigns a monotonically increasing index to each item in the underling Reiterator.
+   *
+   * @param <T> The value type of the underlying iterable.
+   */
+  private static class IndexingReiterator<T> implements Reiterator<IndexingReiterator.Indexed<T>> {
+
+    private Reiterator<T> underlying;
+    private int index;
+
+    public IndexingReiterator(Reiterator<T> underlying) {
+      this(underlying, 0);
+    }
+
+    public IndexingReiterator(Reiterator<T> underlying, int start) {
+      this.underlying = underlying;
+      this.index = start;
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public V next() {
-      advance();
-      return (V) unions.next().getValue();
+    public IndexingReiterator<T> copy() {
+      return new IndexingReiterator(underlying.copy(), index);
     }
 
-    private void advance() {
-      while (unions.hasNext()) {
-        int curTag = unions.peek().getUnionTag();
-        containsTag[curTag] = true;
-        if (curTag == tag) {
-          break;
-        }
-        unions.next();
+    @Override
+    public boolean hasNext() {
+      return underlying.hasNext();
+    }
+
+    @Override
+    public Indexed<T> next() {
+      return new Indexed<T>(index++, underlying.next());
+    }
+
+    public static class Indexed<T> {
+      public final int index;
+      public final T value;
+
+      public Indexed(int index, T value) {
+        this.index = index;
+        this.value = value;
+      }
+    }
+  }
+
+  /**
+   * Adapts an Reiterator, giving it a peek() method that can be used to observe the next element
+   * without consuming it.
+   *
+   * @param <T> The value type of the underlying iterable.
+   */
+  private static class PeekingReiterator<T> implements Reiterator<T> {
+    private Reiterator<T> underlying;
+    private T next;
+    private boolean nextIsValid;
+
+    public PeekingReiterator(Reiterator<T> underlying) {
+      this(underlying, null, false);
+    }
+
+    private PeekingReiterator(Reiterator<T> underlying, T next, boolean nextIsValid) {
+      this.underlying = underlying;
+      this.next = next;
+      this.nextIsValid = nextIsValid;
+    }
+
+    @Override
+    public PeekingReiterator<T> copy() {
+      return new PeekingReiterator(underlying.copy(), next, nextIsValid);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return nextIsValid || underlying.hasNext();
+    }
+
+    @Override
+    public T next() {
+      if (nextIsValid) {
+        nextIsValid = false;
+        return next;
+      } else {
+        return underlying.next();
+      }
+    }
+
+    public T peek() {
+      if (!nextIsValid) {
+        next = underlying.next();
+        nextIsValid = true;
+      }
+      return next;
+    }
+  }
+
+  /**
+   * An Iterable corresponding to a single tag.
+   *
+   * <p>The values in this iterable are populated lazily via the offer method as tip advances for
+   * any tag.
+   *
+   * @param <T> The value type of the corresponging tag.
+   */
+  private static class TagIterable<T> implements Iterable<T> {
+    int tag;
+    int cacheSize;
+    Supplier<Boolean> forceCache;
+
+    ObservingReiterator<RawUnionValue> tip;
+
+    List<T> head;
+    Reiterator<RawUnionValue> tail;
+    boolean finished;
+
+    public TagIterable(
+        List<T> head, int tag, int cacheSize, ObservingReiterator<RawUnionValue> tip) {
+      this.tag = tag;
+      this.cacheSize = cacheSize;
+      this.head = head;
+      this.tip = tip;
+    }
+
+    void offer(ObservingReiterator<RawUnionValue> tail) {
+      assert !finished;
+      assert tail.peek().getUnionTag() == tag;
+      if (head.size() < cacheSize) {
+        head.add((T) tail.peek().getValue());
+      } else if (this.tail == null) {
+        this.tail = tail.copy();
+      }
+    }
+
+    void finish() {
+      finished = true;
+    }
+
+    void seek(int tag) {
+      while (tip.hasNext() && tip.peek().getUnionTag() != tag) {
+        tip.next();
       }
     }
 
     @Override
-    public void remove() {
-      throw new UnsupportedOperationException();
+    public Iterator<T> iterator() {
+      return new Iterator<T>() {
+
+        boolean isDone;
+        boolean advanced;
+        T next;
+
+        /** Keeps track of the index, in head, that this iterator points to. */
+        int index = -1;
+        /** If the index is beyond what was cached in head, this is this iterators view of tail. */
+        Iterator<T> tailIter;
+
+        @Override
+        public boolean hasNext() {
+          if (!advanced) {
+            advance();
+          }
+          return !isDone;
+        }
+
+        @Override
+        public T next() {
+          if (!advanced) {
+            advance();
+          }
+          if (isDone) {
+            throw new NoSuchElementException();
+          }
+          advanced = false;
+          return next;
+        }
+
+        private void advance() {
+          assert !advanced;
+          assert !isDone;
+          advanced = true;
+
+          index++;
+          if (maybeAdvance()) {
+            return;
+          }
+
+          // We were unable to advance; advance tip to populate either head or tail.
+          tip.fastForward();
+          if (tip.hasNext()) {
+            tip.next();
+            seek(tag);
+          }
+
+          // A this point, either head or tail should be sufficient to advance.
+          assert maybeAdvance();
+        }
+
+        private boolean maybeAdvance() {
+          if (index < head.size()) {
+            // First consume head.
+            assert tailIter == null;
+            next = head.get(index);
+            return true;
+          } else if (tail != null) {
+            // Next consume tail, if any.
+            if (tailIter == null) {
+              tailIter =
+                  Iterators.transform(
+                      Iterators.filter(
+                          tail.copy(), taggedUnion -> taggedUnion.getUnionTag() == tag),
+                      taggedUnion -> (T) taggedUnion.getValue());
+            }
+            if (tailIter.hasNext()) {
+              next = tailIter.next();
+            } else {
+              isDone = true;
+            }
+            return true;
+          } else if (finished) {
+            // If there are no more elements in head, and tail was not populated, and we are
+            // finshed, this is the end of the iteration.
+            isDone = true;
+            return true;
+          } else {
+            // We need more lements in either head or tail.

Review comment:
       ```suggestion
               // We need more elements in either head or tail.
   ```

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultTest.java
##########
@@ -57,6 +74,175 @@ public void runLazyResult(int cacheSize) {
     assertThat(result.getAll(new TupleTag<>("tag0")), contains(0, 2, 4));
   }
 
+  @Test
+  public void testLazyResults() {
+    TestUnionValues values = new TestUnionValues(0, 0, 1, 1, 0, 1, 1);
+    CoGbkResult result = new CoGbkResult(createSchema(5), values, 0, 2);
+    // Nothing is read until we try to iterate.
+    assertThat(values.maxPos(), equalTo(0));
+    Iterable<?> tag0iterable = result.getAll("tag0");
+    assertThat(values.maxPos(), equalTo(0));
+    tag0iterable.iterator();
+    assertThat(values.maxPos(), equalTo(0));
+
+    // Iterating reads (nearly) the minimal number of values.
+    Iterator<?> tag0 = tag0iterable.iterator();
+    tag0.next();
+    assertThat(values.maxPos(), lessThanOrEqualTo(2));
+    tag0.next();
+    assertThat(values.maxPos(), equalTo(2));
+    // Note that we're skipping over tag 1.
+    tag0.next();
+    assertThat(values.maxPos(), equalTo(5));
+
+    // Iterating again does not cause more reads.
+    Iterator<?> tag0iterAgain = tag0iterable.iterator();
+    tag0iterAgain.next();
+    tag0iterAgain.next();
+    tag0iterAgain.next();
+    assertThat(values.maxPos(), equalTo(5));
+
+    // Iterating over other tags does not cause more reads for values we have seen.
+    Iterator<?> tag1 = result.getAll("tag1").iterator();
+    tag1.next();
+    tag1.next();
+    assertThat(values.maxPos(), equalTo(5));
+    // However, finding the next tag1 value does require more reads.
+    tag1.next();
+    assertThat(values.maxPos(), equalTo(6));
+  }
+
+  @Test
+  @SuppressWarnings("BoxedPrimitiveEquality")
+  public void testCachedResults() {
+    // Ensure we don't fail below due to odd VM settings.

Review comment:
       out of curiousity, what odd VM settings?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -361,62 +377,332 @@ private CoGbkResult(CoGbkResultSchema schema, List<Iterable<?>> valueMap) {
   }
 
   /**
-   * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an {@code Iterator<V>},
-   * where V is the type of the raw union value's contents.
+   * A re-iterable that notifies an observer at every advance, and upon finishing, but only once
+   * across all copies.
+   *
+   * @param <T> The value type of the underlying iterable.
    */
-  private static class UnionValueIterator<V> implements Iterator<V> {
+  private static class ObservingReiterator<T> implements Reiterator<T> {
+
+    public interface Observer<T> {
+      /**
+       * Called exactly once, across all copies before advancing this iterator.
+       *
+       * <p>The iterator rather than the element is given so that the callee can perform a copy if
+       * desired. This class offers a peek method to get at the current element without disturbing
+       * the state of this iterator.
+       */
+      void observeAt(ObservingReiterator<T> reiterator);
+
+      /** Called exactly once, across all copies, once this iterator is exhausted. */
+      void done();
+    }
 
-    private final int tag;
-    private final PeekingIterator<RawUnionValue> unions;
-    private final Boolean[] containsTag;
+    private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying;
+    private Observer<T> observer;
 
-    private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) {
-      this.tag = tag;
-      this.unions = Iterators.peekingIterator(unions);
-      this.containsTag = containsTag;
+    // Used to keep track of what has been observed so far.
+    // These are arrays to facilitate sharing values among all copies of the same root Reiterator.
+    private final int[] lastObserved;
+    private final boolean[] doneHasRun;
+    private final PeekingReiterator[] mostAdvanced;
+
+    public ObservingReiterator(Reiterator<T> underlying, Observer<T> observer) {
+      this(new PeekingReiterator<>(new IndexingReiterator<>(underlying)), observer);
+    }
+
+    public ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying, Observer<T> observer) {
+      this(
+          underlying,
+          observer,
+          new int[] {-1},
+          new boolean[] {false},
+          new PeekingReiterator[] {underlying});
+    }
+
+    private ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying,
+        Observer<T> observer,
+        int[] lastObserved,
+        boolean[] doneHasRun,
+        PeekingReiterator[] mostAdvanced) {
+      this.underlying = underlying;
+      this.observer = observer;
+      this.lastObserved = lastObserved;
+      this.doneHasRun = doneHasRun;
+      this.mostAdvanced = mostAdvanced;
+    }
+
+    @Override
+    public Reiterator<T> copy() {
+      return new ObservingReiterator<T>(
+          underlying.copy(), observer, lastObserved, doneHasRun, mostAdvanced);
     }
 
     @Override
     public boolean hasNext() {
-      if (Boolean.FALSE.equals(containsTag[tag])) {
-        return false;
+      boolean hasNext = underlying.hasNext();
+      if (!hasNext && !doneHasRun[0]) {
+        mostAdvanced[0] = underlying;
+        observer.done();
+        doneHasRun[0] = true;
       }
-      advance();
-      if (unions.hasNext()) {
-        return true;
-      } else {
-        // Now that we've iterated over all the values, we can resolve all the "unknown" null
-        // values to false.
-        for (int i = 0; i < containsTag.length; i++) {
-          if (containsTag[i] == null) {
-            containsTag[i] = false;
-          }
-        }
-        return false;
+      return hasNext;
+    }
+
+    @Override
+    public T next() {
+      peek(); // trigger observation *before* advancing
+      return underlying.next().value;
+    }
+
+    public T peek() {
+      IndexingReiterator.Indexed<T> next = underlying.peek();
+      if (next.index > lastObserved[0]) {
+        assert next.index == lastObserved[0] + 1;
+        mostAdvanced[0] = underlying;
+        lastObserved[0] = next.index;
+        observer.observeAt(this);
       }
+      return next.value;
+    }
+
+    public void fastForward() {
+      if (underlying != mostAdvanced[0]) {
+        underlying = mostAdvanced[0].copy();
+      }
+    }
+  }
+
+  /**
+   * Assigns a monotonically increasing index to each item in the underling Reiterator.
+   *
+   * @param <T> The value type of the underlying iterable.
+   */
+  private static class IndexingReiterator<T> implements Reiterator<IndexingReiterator.Indexed<T>> {
+
+    private Reiterator<T> underlying;
+    private int index;
+
+    public IndexingReiterator(Reiterator<T> underlying) {
+      this(underlying, 0);
+    }
+
+    public IndexingReiterator(Reiterator<T> underlying, int start) {
+      this.underlying = underlying;
+      this.index = start;
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public V next() {
-      advance();
-      return (V) unions.next().getValue();
+    public IndexingReiterator<T> copy() {
+      return new IndexingReiterator(underlying.copy(), index);
     }
 
-    private void advance() {
-      while (unions.hasNext()) {
-        int curTag = unions.peek().getUnionTag();
-        containsTag[curTag] = true;
-        if (curTag == tag) {
-          break;
-        }
-        unions.next();
+    @Override
+    public boolean hasNext() {
+      return underlying.hasNext();
+    }
+
+    @Override
+    public Indexed<T> next() {
+      return new Indexed<T>(index++, underlying.next());
+    }
+
+    public static class Indexed<T> {
+      public final int index;
+      public final T value;
+
+      public Indexed(int index, T value) {
+        this.index = index;
+        this.value = value;
+      }
+    }
+  }
+
+  /**
+   * Adapts an Reiterator, giving it a peek() method that can be used to observe the next element
+   * without consuming it.
+   *
+   * @param <T> The value type of the underlying iterable.
+   */
+  private static class PeekingReiterator<T> implements Reiterator<T> {
+    private Reiterator<T> underlying;
+    private T next;
+    private boolean nextIsValid;
+
+    public PeekingReiterator(Reiterator<T> underlying) {
+      this(underlying, null, false);
+    }
+
+    private PeekingReiterator(Reiterator<T> underlying, T next, boolean nextIsValid) {
+      this.underlying = underlying;
+      this.next = next;
+      this.nextIsValid = nextIsValid;
+    }
+
+    @Override
+    public PeekingReiterator<T> copy() {
+      return new PeekingReiterator(underlying.copy(), next, nextIsValid);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return nextIsValid || underlying.hasNext();
+    }
+
+    @Override
+    public T next() {
+      if (nextIsValid) {
+        nextIsValid = false;
+        return next;
+      } else {
+        return underlying.next();
+      }
+    }
+
+    public T peek() {
+      if (!nextIsValid) {
+        next = underlying.next();
+        nextIsValid = true;
+      }
+      return next;
+    }
+  }
+
+  /**
+   * An Iterable corresponding to a single tag.
+   *
+   * <p>The values in this iterable are populated lazily via the offer method as tip advances for
+   * any tag.
+   *
+   * @param <T> The value type of the corresponging tag.
+   */
+  private static class TagIterable<T> implements Iterable<T> {
+    int tag;
+    int cacheSize;
+    Supplier<Boolean> forceCache;
+
+    ObservingReiterator<RawUnionValue> tip;
+
+    List<T> head;
+    Reiterator<RawUnionValue> tail;
+    boolean finished;
+
+    public TagIterable(
+        List<T> head, int tag, int cacheSize, ObservingReiterator<RawUnionValue> tip) {
+      this.tag = tag;
+      this.cacheSize = cacheSize;
+      this.head = head;
+      this.tip = tip;
+    }
+
+    void offer(ObservingReiterator<RawUnionValue> tail) {
+      assert !finished;
+      assert tail.peek().getUnionTag() == tag;
+      if (head.size() < cacheSize) {
+        head.add((T) tail.peek().getValue());
+      } else if (this.tail == null) {
+        this.tail = tail.copy();
+      }
+    }
+
+    void finish() {
+      finished = true;
+    }
+
+    void seek(int tag) {
+      while (tip.hasNext() && tip.peek().getUnionTag() != tag) {
+        tip.next();
       }
     }
 
     @Override
-    public void remove() {
-      throw new UnsupportedOperationException();
+    public Iterator<T> iterator() {
+      return new Iterator<T>() {
+
+        boolean isDone;
+        boolean advanced;
+        T next;
+
+        /** Keeps track of the index, in head, that this iterator points to. */
+        int index = -1;
+        /** If the index is beyond what was cached in head, this is this iterators view of tail. */
+        Iterator<T> tailIter;
+
+        @Override
+        public boolean hasNext() {
+          if (!advanced) {
+            advance();
+          }
+          return !isDone;
+        }
+
+        @Override
+        public T next() {
+          if (!advanced) {
+            advance();
+          }
+          if (isDone) {
+            throw new NoSuchElementException();
+          }
+          advanced = false;
+          return next;
+        }
+
+        private void advance() {
+          assert !advanced;
+          assert !isDone;
+          advanced = true;
+
+          index++;
+          if (maybeAdvance()) {
+            return;
+          }
+
+          // We were unable to advance; advance tip to populate either head or tail.
+          tip.fastForward();
+          if (tip.hasNext()) {
+            tip.next();
+            seek(tag);
+          }
+
+          // A this point, either head or tail should be sufficient to advance.
+          assert maybeAdvance();
+        }
+
+        private boolean maybeAdvance() {
+          if (index < head.size()) {
+            // First consume head.
+            assert tailIter == null;
+            next = head.get(index);
+            return true;
+          } else if (tail != null) {
+            // Next consume tail, if any.
+            if (tailIter == null) {
+              tailIter =
+                  Iterators.transform(
+                      Iterators.filter(
+                          tail.copy(), taggedUnion -> taggedUnion.getUnionTag() == tag),
+                      taggedUnion -> (T) taggedUnion.getValue());
+            }
+            if (tailIter.hasNext()) {
+              next = tailIter.next();
+            } else {
+              isDone = true;
+            }
+            return true;
+          } else if (finished) {
+            // If there are no more elements in head, and tail was not populated, and we are
+            // finshed, this is the end of the iteration.

Review comment:
       ```suggestion
               // finished, this is the end of the iteration.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776842670



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -361,62 +377,332 @@ private CoGbkResult(CoGbkResultSchema schema, List<Iterable<?>> valueMap) {
   }
 
   /**
-   * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an {@code Iterator<V>},
-   * where V is the type of the raw union value's contents.
+   * A re-iterable that notifies an observer at every advance, and upon finishing, but only once
+   * across all copies.
+   *
+   * @param <T> The value type of the underlying iterable.
    */
-  private static class UnionValueIterator<V> implements Iterator<V> {
+  private static class ObservingReiterator<T> implements Reiterator<T> {
+
+    public interface Observer<T> {
+      /**
+       * Called exactly once, across all copies before advancing this iterator.
+       *
+       * <p>The iterator rather than the element is given so that the callee can perform a copy if
+       * desired. This class offers a peek method to get at the current element without disturbing
+       * the state of this iterator.
+       */
+      void observeAt(ObservingReiterator<T> reiterator);
+
+      /** Called exactly once, across all copies, once this iterator is exhausted. */
+      void done();
+    }
 
-    private final int tag;
-    private final PeekingIterator<RawUnionValue> unions;
-    private final Boolean[] containsTag;
+    private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying;
+    private Observer<T> observer;
 
-    private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) {
-      this.tag = tag;
-      this.unions = Iterators.peekingIterator(unions);
-      this.containsTag = containsTag;
+    // Used to keep track of what has been observed so far.
+    // These are arrays to facilitate sharing values among all copies of the same root Reiterator.
+    private final int[] lastObserved;
+    private final boolean[] doneHasRun;
+    private final PeekingReiterator[] mostAdvanced;
+
+    public ObservingReiterator(Reiterator<T> underlying, Observer<T> observer) {
+      this(new PeekingReiterator<>(new IndexingReiterator<>(underlying)), observer);
+    }
+
+    public ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying, Observer<T> observer) {
+      this(
+          underlying,
+          observer,
+          new int[] {-1},
+          new boolean[] {false},
+          new PeekingReiterator[] {underlying});
+    }
+
+    private ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying,
+        Observer<T> observer,
+        int[] lastObserved,
+        boolean[] doneHasRun,
+        PeekingReiterator[] mostAdvanced) {
+      this.underlying = underlying;
+      this.observer = observer;
+      this.lastObserved = lastObserved;
+      this.doneHasRun = doneHasRun;
+      this.mostAdvanced = mostAdvanced;
+    }
+
+    @Override
+    public Reiterator<T> copy() {
+      return new ObservingReiterator<T>(
+          underlying.copy(), observer, lastObserved, doneHasRun, mostAdvanced);
     }
 
     @Override
     public boolean hasNext() {
-      if (Boolean.FALSE.equals(containsTag[tag])) {
-        return false;
+      boolean hasNext = underlying.hasNext();
+      if (!hasNext && !doneHasRun[0]) {
+        mostAdvanced[0] = underlying;
+        observer.done();
+        doneHasRun[0] = true;
       }
-      advance();
-      if (unions.hasNext()) {
-        return true;
-      } else {
-        // Now that we've iterated over all the values, we can resolve all the "unknown" null
-        // values to false.
-        for (int i = 0; i < containsTag.length; i++) {
-          if (containsTag[i] == null) {
-            containsTag[i] = false;
-          }
-        }
-        return false;
+      return hasNext;
+    }
+
+    @Override
+    public T next() {
+      peek(); // trigger observation *before* advancing
+      return underlying.next().value;
+    }
+
+    public T peek() {
+      IndexingReiterator.Indexed<T> next = underlying.peek();
+      if (next.index > lastObserved[0]) {
+        assert next.index == lastObserved[0] + 1;
+        mostAdvanced[0] = underlying;
+        lastObserved[0] = next.index;
+        observer.observeAt(this);
       }
+      return next.value;
+    }
+
+    public void fastForward() {
+      if (underlying != mostAdvanced[0]) {
+        underlying = mostAdvanced[0].copy();
+      }
+    }
+  }
+
+  /**
+   * Assigns a monotonically increasing index to each item in the underling Reiterator.
+   *
+   * @param <T> The value type of the underlying iterable.
+   */
+  private static class IndexingReiterator<T> implements Reiterator<IndexingReiterator.Indexed<T>> {
+
+    private Reiterator<T> underlying;
+    private int index;
+
+    public IndexingReiterator(Reiterator<T> underlying) {
+      this(underlying, 0);
+    }
+
+    public IndexingReiterator(Reiterator<T> underlying, int start) {
+      this.underlying = underlying;
+      this.index = start;
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public V next() {
-      advance();
-      return (V) unions.next().getValue();
+    public IndexingReiterator<T> copy() {
+      return new IndexingReiterator(underlying.copy(), index);
     }
 
-    private void advance() {
-      while (unions.hasNext()) {
-        int curTag = unions.peek().getUnionTag();
-        containsTag[curTag] = true;
-        if (curTag == tag) {
-          break;
-        }
-        unions.next();
+    @Override
+    public boolean hasNext() {
+      return underlying.hasNext();
+    }
+
+    @Override
+    public Indexed<T> next() {
+      return new Indexed<T>(index++, underlying.next());
+    }
+
+    public static class Indexed<T> {
+      public final int index;
+      public final T value;
+
+      public Indexed(int index, T value) {
+        this.index = index;
+        this.value = value;
+      }
+    }
+  }
+
+  /**
+   * Adapts an Reiterator, giving it a peek() method that can be used to observe the next element
+   * without consuming it.
+   *
+   * @param <T> The value type of the underlying iterable.
+   */
+  private static class PeekingReiterator<T> implements Reiterator<T> {
+    private Reiterator<T> underlying;
+    private T next;
+    private boolean nextIsValid;
+
+    public PeekingReiterator(Reiterator<T> underlying) {
+      this(underlying, null, false);
+    }
+
+    private PeekingReiterator(Reiterator<T> underlying, T next, boolean nextIsValid) {
+      this.underlying = underlying;
+      this.next = next;
+      this.nextIsValid = nextIsValid;
+    }
+
+    @Override
+    public PeekingReiterator<T> copy() {
+      return new PeekingReiterator(underlying.copy(), next, nextIsValid);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return nextIsValid || underlying.hasNext();
+    }
+
+    @Override
+    public T next() {
+      if (nextIsValid) {
+        nextIsValid = false;
+        return next;
+      } else {
+        return underlying.next();
+      }
+    }
+
+    public T peek() {
+      if (!nextIsValid) {
+        next = underlying.next();
+        nextIsValid = true;
+      }
+      return next;
+    }
+  }
+
+  /**
+   * An Iterable corresponding to a single tag.
+   *
+   * <p>The values in this iterable are populated lazily via the offer method as tip advances for
+   * any tag.
+   *
+   * @param <T> The value type of the corresponging tag.
+   */
+  private static class TagIterable<T> implements Iterable<T> {
+    int tag;
+    int cacheSize;
+    Supplier<Boolean> forceCache;

Review comment:
       ?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -98,8 +104,7 @@ public CoGbkResult(
         throw new IllegalStateException(
             "union tag " + unionTag + " has no corresponding tuple tag in the result schema");
       }
-      List<Object> valueList = (List<Object>) valueMap.get(unionTag);
-      valueList.add(value.getValue());
+      valuesByTag.get(unionTag).add(value.getValue());
     }
 
     if (taggedIter.hasNext()) {

Review comment:
       nit: use guard style
   
   ```
   if (!taggedIter.hasNext()) {
     valueMap = (List) valuesByTag;
   }
   
   // If we get here, there were more elements than we can afford to
   // keep in memory, so we copy the re-iterable of remaining items
   // and append filtered views to each of the sorted lists computed earlier.
   ...
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -59,6 +60,8 @@
 
   private static final int DEFAULT_IN_MEMORY_ELEMENT_COUNT = 10_000;
 
+  private static final int DEFAULT_MIN_ELEMENTS_PER_TAG = 100;

Review comment:
       Based upon the code it looks like this is used as a max per tag.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #16354:
URL: https://github.com/apache/beam/pull/16354#issuecomment-1002189136


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #16354:
URL: https://github.com/apache/beam/pull/16354#issuecomment-1002232655


   R: @emilymye or @aaltay or @lukecwik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776848950



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultTest.java
##########
@@ -57,6 +74,175 @@ public void runLazyResult(int cacheSize) {
     assertThat(result.getAll(new TupleTag<>("tag0")), contains(0, 2, 4));
   }
 
+  @Test
+  public void testLazyResults() {
+    TestUnionValues values = new TestUnionValues(0, 0, 1, 1, 0, 1, 1);
+    CoGbkResult result = new CoGbkResult(createSchema(5), values, 0, 2);
+    // Nothing is read until we try to iterate.
+    assertThat(values.maxPos(), equalTo(0));
+    Iterable<?> tag0iterable = result.getAll("tag0");
+    assertThat(values.maxPos(), equalTo(0));
+    tag0iterable.iterator();
+    assertThat(values.maxPos(), equalTo(0));
+
+    // Iterating reads (nearly) the minimal number of values.
+    Iterator<?> tag0 = tag0iterable.iterator();
+    tag0.next();
+    assertThat(values.maxPos(), lessThanOrEqualTo(2));
+    tag0.next();
+    assertThat(values.maxPos(), equalTo(2));
+    // Note that we're skipping over tag 1.
+    tag0.next();
+    assertThat(values.maxPos(), equalTo(5));
+
+    // Iterating again does not cause more reads.
+    Iterator<?> tag0iterAgain = tag0iterable.iterator();
+    tag0iterAgain.next();
+    tag0iterAgain.next();
+    tag0iterAgain.next();
+    assertThat(values.maxPos(), equalTo(5));
+
+    // Iterating over other tags does not cause more reads for values we have seen.
+    Iterator<?> tag1 = result.getAll("tag1").iterator();
+    tag1.next();
+    tag1.next();
+    assertThat(values.maxPos(), equalTo(5));
+    // However, finding the next tag1 value does require more reads.
+    tag1.next();
+    assertThat(values.maxPos(), equalTo(6));
+  }
+
+  @Test
+  @SuppressWarnings("BoxedPrimitiveEquality")
+  public void testCachedResults() {
+    // Ensure we don't fail below due to a non-default java.lang.Integer.IntegerCache.high setting.

Review comment:
       I understand that. I was trying to highlight the spacing issue in the comment. Also consider using a different type which isn't interned by the JVM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776850987



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultTest.java
##########
@@ -57,6 +74,175 @@ public void runLazyResult(int cacheSize) {
     assertThat(result.getAll(new TupleTag<>("tag0")), contains(0, 2, 4));
   }
 
+  @Test
+  public void testLazyResults() {
+    TestUnionValues values = new TestUnionValues(0, 0, 1, 1, 0, 1, 1);
+    CoGbkResult result = new CoGbkResult(createSchema(5), values, 0, 2);
+    // Nothing is read until we try to iterate.
+    assertThat(values.maxPos(), equalTo(0));
+    Iterable<?> tag0iterable = result.getAll("tag0");
+    assertThat(values.maxPos(), equalTo(0));
+    tag0iterable.iterator();
+    assertThat(values.maxPos(), equalTo(0));
+
+    // Iterating reads (nearly) the minimal number of values.
+    Iterator<?> tag0 = tag0iterable.iterator();
+    tag0.next();
+    assertThat(values.maxPos(), lessThanOrEqualTo(2));
+    tag0.next();
+    assertThat(values.maxPos(), equalTo(2));
+    // Note that we're skipping over tag 1.
+    tag0.next();
+    assertThat(values.maxPos(), equalTo(5));
+
+    // Iterating again does not cause more reads.
+    Iterator<?> tag0iterAgain = tag0iterable.iterator();
+    tag0iterAgain.next();
+    tag0iterAgain.next();
+    tag0iterAgain.next();
+    assertThat(values.maxPos(), equalTo(5));
+
+    // Iterating over other tags does not cause more reads for values we have seen.
+    Iterator<?> tag1 = result.getAll("tag1").iterator();
+    tag1.next();
+    tag1.next();
+    assertThat(values.maxPos(), equalTo(5));
+    // However, finding the next tag1 value does require more reads.
+    tag1.next();
+    assertThat(values.maxPos(), equalTo(6));
+  }
+
+  @Test
+  @SuppressWarnings("BoxedPrimitiveEquality")
+  public void testCachedResults() {
+    // Ensure we don't fail below due to a non-default java.lang.Integer.IntegerCache.high setting.

Review comment:
       Yeah. Strings have their own (more difficult to reason about) interning issues, and using a non-primitive gets really verbose here, which is why I stayed with integers. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #16354:
URL: https://github.com/apache/beam/pull/16354#issuecomment-1003263021


   Same failure with unrelated org.apache.beam.runners.flink.FlinkRequiresStableInputTest.testParDoRequiresStableInput . I'm going to go ahead and merge so we can get a cherry pick going.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb merged pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
robertwb merged pull request #16354:
URL: https://github.com/apache/beam/pull/16354


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #16354:
URL: https://github.com/apache/beam/pull/16354#issuecomment-1001919689


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776826720



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultTest.java
##########
@@ -57,6 +74,175 @@ public void runLazyResult(int cacheSize) {
     assertThat(result.getAll(new TupleTag<>("tag0")), contains(0, 2, 4));
   }
 
+  @Test
+  public void testLazyResults() {
+    TestUnionValues values = new TestUnionValues(0, 0, 1, 1, 0, 1, 1);
+    CoGbkResult result = new CoGbkResult(createSchema(5), values, 0, 2);
+    // Nothing is read until we try to iterate.
+    assertThat(values.maxPos(), equalTo(0));
+    Iterable<?> tag0iterable = result.getAll("tag0");
+    assertThat(values.maxPos(), equalTo(0));
+    tag0iterable.iterator();
+    assertThat(values.maxPos(), equalTo(0));
+
+    // Iterating reads (nearly) the minimal number of values.
+    Iterator<?> tag0 = tag0iterable.iterator();
+    tag0.next();
+    assertThat(values.maxPos(), lessThanOrEqualTo(2));
+    tag0.next();
+    assertThat(values.maxPos(), equalTo(2));
+    // Note that we're skipping over tag 1.
+    tag0.next();
+    assertThat(values.maxPos(), equalTo(5));
+
+    // Iterating again does not cause more reads.
+    Iterator<?> tag0iterAgain = tag0iterable.iterator();
+    tag0iterAgain.next();
+    tag0iterAgain.next();
+    tag0iterAgain.next();
+    assertThat(values.maxPos(), equalTo(5));
+
+    // Iterating over other tags does not cause more reads for values we have seen.
+    Iterator<?> tag1 = result.getAll("tag1").iterator();
+    tag1.next();
+    tag1.next();
+    assertThat(values.maxPos(), equalTo(5));
+    // However, finding the next tag1 value does require more reads.
+    tag1.next();
+    assertThat(values.maxPos(), equalTo(6));
+  }
+
+  @Test
+  @SuppressWarnings("BoxedPrimitiveEquality")
+  public void testCachedResults() {
+    // Ensure we don't fail below due to odd VM settings.

Review comment:
       java.lang.Integer.IntegerCache.high. Clarified to be more explicit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776847643



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultTest.java
##########
@@ -57,6 +74,175 @@ public void runLazyResult(int cacheSize) {
     assertThat(result.getAll(new TupleTag<>("tag0")), contains(0, 2, 4));
   }
 
+  @Test
+  public void testLazyResults() {
+    TestUnionValues values = new TestUnionValues(0, 0, 1, 1, 0, 1, 1);
+    CoGbkResult result = new CoGbkResult(createSchema(5), values, 0, 2);
+    // Nothing is read until we try to iterate.
+    assertThat(values.maxPos(), equalTo(0));
+    Iterable<?> tag0iterable = result.getAll("tag0");
+    assertThat(values.maxPos(), equalTo(0));
+    tag0iterable.iterator();
+    assertThat(values.maxPos(), equalTo(0));
+
+    // Iterating reads (nearly) the minimal number of values.
+    Iterator<?> tag0 = tag0iterable.iterator();
+    tag0.next();
+    assertThat(values.maxPos(), lessThanOrEqualTo(2));
+    tag0.next();
+    assertThat(values.maxPos(), equalTo(2));
+    // Note that we're skipping over tag 1.
+    tag0.next();
+    assertThat(values.maxPos(), equalTo(5));
+
+    // Iterating again does not cause more reads.
+    Iterator<?> tag0iterAgain = tag0iterable.iterator();
+    tag0iterAgain.next();
+    tag0iterAgain.next();
+    tag0iterAgain.next();
+    assertThat(values.maxPos(), equalTo(5));
+
+    // Iterating over other tags does not cause more reads for values we have seen.
+    Iterator<?> tag1 = result.getAll("tag1").iterator();
+    tag1.next();
+    tag1.next();
+    assertThat(values.maxPos(), equalTo(5));
+    // However, finding the next tag1 value does require more reads.
+    tag1.next();
+    assertThat(values.maxPos(), equalTo(6));
+  }
+
+  @Test
+  @SuppressWarnings("BoxedPrimitiveEquality")
+  public void testCachedResults() {
+    // Ensure we don't fail below due to a non-default java.lang.Integer.IntegerCache.high setting.

Review comment:
       The assertion of which values are cached vs. re-created relies on not-so-small integers not being cached. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #16354: [BEAM-13541] More intelligent caching of CoGBK values.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776847859



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -59,6 +60,8 @@
 
   private static final int DEFAULT_IN_MEMORY_ELEMENT_COUNT = 10_000;
 
+  private static final int DEFAULT_MIN_ELEMENTS_PER_TAG = 100;

Review comment:
       The idea is that we will cache always at least this many values per tag, regardless of whether DEFAULT_IN_MEMORY_ELEMENT_COUNT was "used up" for other tags. I'll clarify. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org