You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/09/07 22:27:59 UTC

[GitHub] [beam] lukecwik opened a new pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

lukecwik opened a new pull request #15475:
URL: https://github.com/apache/beam/pull/15475


   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kileys commented on a change in pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
kileys commented on a change in pull request #15475:
URL: https://github.com/apache/beam/pull/15475#discussion_r705654773



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
##########
@@ -195,8 +245,17 @@ public ByteString firstPage() {
       this.continuationToken = stateRequestForFirstChunk.getGet().getContinuationToken();
     }
 
-    private void prefetch() {
-      if (prefetchedResponse == null && currentState == State.READ_REQUIRED) {
+    @Override
+    public boolean isReady() {
+      if (prefetchedResponse == null) {
+        return currentState != State.READ_REQUIRED;
+      }
+      return prefetchedResponse.isDone();
+    }
+
+    @Override
+    public void prefetch() {
+      if (!isReady()) {
         prefetchedResponse = new CompletableFuture<>();

Review comment:
       Is there a risk of overriding an ongoing prefetch? Do we need to check that prefetchedResponse is null?

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
##########
@@ -19,29 +19,72 @@
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import org.apache.beam.fn.harness.state.StateFetchingIterators.FirstPageAndRemainder;
 import org.apache.beam.fn.harness.state.StateFetchingIterators.LazyBlockingStateFetchingIterator;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateGetResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterator;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /** Tests for {@link StateFetchingIterators}. */
+@RunWith(Enclosed.class)
 public class StateFetchingIteratorsTest {
+
+  private static BeamFnStateClient fakeStateClient(

Review comment:
       https://github.com/apache/beam/blob/master/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #15475:
URL: https://github.com/apache/beam/pull/15475#discussion_r762219693



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
##########
@@ -77,49 +121,55 @@ public void testMultiWithEmptyByteStrings() throws Exception {
           ByteString.EMPTY);
     }
 
-    private BeamFnStateClient fakeStateClient(AtomicInteger callCount, ByteString... expected) {
-      return (requestBuilder, response) -> {
-        callCount.incrementAndGet();
-        if (expected.length == 0) {
-          response.complete(
-              StateResponse.newBuilder()
-                  .setId(requestBuilder.getId())
-                  .setGet(StateGetResponse.newBuilder())
-                  .build());
-          return;
-        }
-
-        ByteString continuationToken = requestBuilder.getGet().getContinuationToken();
-
-        int requestedPosition = 0; // Default position is 0
-        if (!ByteString.EMPTY.equals(continuationToken)) {
-          requestedPosition = Integer.parseInt(continuationToken.toStringUtf8());
-        }
-
-        // Compute the new continuation token
-        ByteString newContinuationToken = ByteString.EMPTY;
-        if (requestedPosition != expected.length - 1) {
-          newContinuationToken = ByteString.copyFromUtf8(Integer.toString(requestedPosition + 1));
-        }
-        response.complete(
-            StateResponse.newBuilder()
-                .setId(requestBuilder.getId())
-                .setGet(
-                    StateGetResponse.newBuilder()
-                        .setData(expected[requestedPosition])
-                        .setContinuationToken(newContinuationToken))
-                .build());
-      };
+    @Test
+    public void testPrefetchIgnoredWhenExistingPrefetchOngoing() throws Exception {
+      AtomicInteger callCount = new AtomicInteger();
+      BeamFnStateClient fakeStateClient =
+          new BeamFnStateClient() {
+            @Override
+            public void handle(
+                StateRequest.Builder requestBuilder, CompletableFuture<StateResponse> response) {
+              callCount.incrementAndGet();
+            }
+          };
+      PrefetchableIterator<ByteString> byteStrings =
+          new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest.getDefaultInstance());
+      assertEquals(0, callCount.get());
+      byteStrings.prefetch();
+      assertEquals(1, callCount.get()); // first prefetch
+      byteStrings.prefetch();
+      assertEquals(1, callCount.get()); // subsequent is ignored
     }
 
     private void testFetch(ByteString... expected) {
       AtomicInteger callCount = new AtomicInteger();
       BeamFnStateClient fakeStateClient = fakeStateClient(callCount, expected);
-      Iterator<ByteString> byteStrings =
+      PrefetchableIterator<ByteString> byteStrings =
           new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest.getDefaultInstance());
       assertEquals(0, callCount.get()); // Ensure it's fully lazy.
-      assertArrayEquals(expected, Iterators.toArray(byteStrings, Object.class));
+      assertFalse(byteStrings.isReady());
+
+      // Prefetch every second element in the iterator capturing the results
+      List<ByteString> results = new ArrayList<>();

Review comment:
       Oh yes sorry, I found the wrong line. Thanks, we can make that change in #15890 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #15475:
URL: https://github.com/apache/beam/pull/15475#discussion_r762215539



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
##########
@@ -77,49 +121,55 @@ public void testMultiWithEmptyByteStrings() throws Exception {
           ByteString.EMPTY);
     }
 
-    private BeamFnStateClient fakeStateClient(AtomicInteger callCount, ByteString... expected) {
-      return (requestBuilder, response) -> {
-        callCount.incrementAndGet();
-        if (expected.length == 0) {
-          response.complete(
-              StateResponse.newBuilder()
-                  .setId(requestBuilder.getId())
-                  .setGet(StateGetResponse.newBuilder())
-                  .build());
-          return;
-        }
-
-        ByteString continuationToken = requestBuilder.getGet().getContinuationToken();
-
-        int requestedPosition = 0; // Default position is 0
-        if (!ByteString.EMPTY.equals(continuationToken)) {
-          requestedPosition = Integer.parseInt(continuationToken.toStringUtf8());
-        }
-
-        // Compute the new continuation token
-        ByteString newContinuationToken = ByteString.EMPTY;
-        if (requestedPosition != expected.length - 1) {
-          newContinuationToken = ByteString.copyFromUtf8(Integer.toString(requestedPosition + 1));
-        }
-        response.complete(
-            StateResponse.newBuilder()
-                .setId(requestBuilder.getId())
-                .setGet(
-                    StateGetResponse.newBuilder()
-                        .setData(expected[requestedPosition])
-                        .setContinuationToken(newContinuationToken))
-                .build());
-      };
+    @Test
+    public void testPrefetchIgnoredWhenExistingPrefetchOngoing() throws Exception {
+      AtomicInteger callCount = new AtomicInteger();
+      BeamFnStateClient fakeStateClient =
+          new BeamFnStateClient() {
+            @Override
+            public void handle(
+                StateRequest.Builder requestBuilder, CompletableFuture<StateResponse> response) {
+              callCount.incrementAndGet();
+            }
+          };
+      PrefetchableIterator<ByteString> byteStrings =
+          new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest.getDefaultInstance());
+      assertEquals(0, callCount.get());
+      byteStrings.prefetch();
+      assertEquals(1, callCount.get()); // first prefetch
+      byteStrings.prefetch();
+      assertEquals(1, callCount.get()); // subsequent is ignored
     }
 
     private void testFetch(ByteString... expected) {
       AtomicInteger callCount = new AtomicInteger();
       BeamFnStateClient fakeStateClient = fakeStateClient(callCount, expected);
-      Iterator<ByteString> byteStrings =
+      PrefetchableIterator<ByteString> byteStrings =
           new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest.getDefaultInstance());
       assertEquals(0, callCount.get()); // Ensure it's fully lazy.
-      assertArrayEquals(expected, Iterators.toArray(byteStrings, Object.class));
+      assertFalse(byteStrings.isReady());
+
+      // Prefetch every second element in the iterator capturing the results
+      List<ByteString> results = new ArrayList<>();

Review comment:
       @lukecwik did you intend to make an assertion on this?
   
   error-prone checks in https://github.com/apache/beam/pull/15890 identify it as unused.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15475:
URL: https://github.com/apache/beam/pull/15475#issuecomment-914669066


   R: @amaliujia @pskevin @kileys


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15475:
URL: https://github.com/apache/beam/pull/15475#issuecomment-914766679


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik removed a comment on pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on pull request #15475:
URL: https://github.com/apache/beam/pull/15475#issuecomment-914766679


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15475:
URL: https://github.com/apache/beam/pull/15475#issuecomment-914670462


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15475:
URL: https://github.com/apache/beam/pull/15475#issuecomment-914794485


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15475:
URL: https://github.com/apache/beam/pull/15475#discussion_r705854833



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
##########
@@ -19,29 +19,72 @@
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import org.apache.beam.fn.harness.state.StateFetchingIterators.FirstPageAndRemainder;
 import org.apache.beam.fn.harness.state.StateFetchingIterators.LazyBlockingStateFetchingIterator;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateGetResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterator;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /** Tests for {@link StateFetchingIterators}. */
+@RunWith(Enclosed.class)
 public class StateFetchingIteratorsTest {
+
+  private static BeamFnStateClient fakeStateClient(

Review comment:
       This isn't a good fit because we want to control the pagination precisely.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #15475:
URL: https://github.com/apache/beam/pull/15475


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15475:
URL: https://github.com/apache/beam/pull/15475#discussion_r762216602



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
##########
@@ -77,49 +121,55 @@ public void testMultiWithEmptyByteStrings() throws Exception {
           ByteString.EMPTY);
     }
 
-    private BeamFnStateClient fakeStateClient(AtomicInteger callCount, ByteString... expected) {
-      return (requestBuilder, response) -> {
-        callCount.incrementAndGet();
-        if (expected.length == 0) {
-          response.complete(
-              StateResponse.newBuilder()
-                  .setId(requestBuilder.getId())
-                  .setGet(StateGetResponse.newBuilder())
-                  .build());
-          return;
-        }
-
-        ByteString continuationToken = requestBuilder.getGet().getContinuationToken();
-
-        int requestedPosition = 0; // Default position is 0
-        if (!ByteString.EMPTY.equals(continuationToken)) {
-          requestedPosition = Integer.parseInt(continuationToken.toStringUtf8());
-        }
-
-        // Compute the new continuation token
-        ByteString newContinuationToken = ByteString.EMPTY;
-        if (requestedPosition != expected.length - 1) {
-          newContinuationToken = ByteString.copyFromUtf8(Integer.toString(requestedPosition + 1));
-        }
-        response.complete(
-            StateResponse.newBuilder()
-                .setId(requestBuilder.getId())
-                .setGet(
-                    StateGetResponse.newBuilder()
-                        .setData(expected[requestedPosition])
-                        .setContinuationToken(newContinuationToken))
-                .build());
-      };
+    @Test
+    public void testPrefetchIgnoredWhenExistingPrefetchOngoing() throws Exception {
+      AtomicInteger callCount = new AtomicInteger();
+      BeamFnStateClient fakeStateClient =
+          new BeamFnStateClient() {
+            @Override
+            public void handle(
+                StateRequest.Builder requestBuilder, CompletableFuture<StateResponse> response) {
+              callCount.incrementAndGet();
+            }
+          };
+      PrefetchableIterator<ByteString> byteStrings =
+          new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest.getDefaultInstance());
+      assertEquals(0, callCount.get());
+      byteStrings.prefetch();
+      assertEquals(1, callCount.get()); // first prefetch
+      byteStrings.prefetch();
+      assertEquals(1, callCount.get()); // subsequent is ignored
     }
 
     private void testFetch(ByteString... expected) {
       AtomicInteger callCount = new AtomicInteger();
       BeamFnStateClient fakeStateClient = fakeStateClient(callCount, expected);
-      Iterator<ByteString> byteStrings =
+      PrefetchableIterator<ByteString> byteStrings =
           new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest.getDefaultInstance());
       assertEquals(0, callCount.get()); // Ensure it's fully lazy.
-      assertArrayEquals(expected, Iterators.toArray(byteStrings, Object.class));
+      assertFalse(byteStrings.isReady());
+
+      // Prefetch every second element in the iterator capturing the results
+      List<ByteString> results = new ArrayList<>();

Review comment:
       That seems like an odd failure cause down below it is used on line 167




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15475:
URL: https://github.com/apache/beam/pull/15475#discussion_r705760596



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
##########
@@ -195,8 +245,17 @@ public ByteString firstPage() {
       this.continuationToken = stateRequestForFirstChunk.getGet().getContinuationToken();
     }
 
-    private void prefetch() {
-      if (prefetchedResponse == null && currentState == State.READ_REQUIRED) {
+    @Override
+    public boolean isReady() {
+      if (prefetchedResponse == null) {
+        return currentState != State.READ_REQUIRED;
+      }
+      return prefetchedResponse.isDone();
+    }
+
+    @Override
+    public void prefetch() {
+      if (!isReady()) {
         prefetchedResponse = new CompletableFuture<>();

Review comment:
       Yes.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
##########
@@ -195,8 +245,17 @@ public ByteString firstPage() {
       this.continuationToken = stateRequestForFirstChunk.getGet().getContinuationToken();
     }
 
-    private void prefetch() {
-      if (prefetchedResponse == null && currentState == State.READ_REQUIRED) {
+    @Override
+    public boolean isReady() {
+      if (prefetchedResponse == null) {
+        return currentState != State.READ_REQUIRED;
+      }
+      return prefetchedResponse.isDone();
+    }
+
+    @Override
+    public void prefetch() {
+      if (!isReady()) {
         prefetchedResponse = new CompletableFuture<>();

Review comment:
       Yes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik removed a comment on pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on pull request #15475:
URL: https://github.com/apache/beam/pull/15475#issuecomment-914670462


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15475:
URL: https://github.com/apache/beam/pull/15475#discussion_r762218309



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
##########
@@ -77,49 +121,55 @@ public void testMultiWithEmptyByteStrings() throws Exception {
           ByteString.EMPTY);
     }
 
-    private BeamFnStateClient fakeStateClient(AtomicInteger callCount, ByteString... expected) {
-      return (requestBuilder, response) -> {
-        callCount.incrementAndGet();
-        if (expected.length == 0) {
-          response.complete(
-              StateResponse.newBuilder()
-                  .setId(requestBuilder.getId())
-                  .setGet(StateGetResponse.newBuilder())
-                  .build());
-          return;
-        }
-
-        ByteString continuationToken = requestBuilder.getGet().getContinuationToken();
-
-        int requestedPosition = 0; // Default position is 0
-        if (!ByteString.EMPTY.equals(continuationToken)) {
-          requestedPosition = Integer.parseInt(continuationToken.toStringUtf8());
-        }
-
-        // Compute the new continuation token
-        ByteString newContinuationToken = ByteString.EMPTY;
-        if (requestedPosition != expected.length - 1) {
-          newContinuationToken = ByteString.copyFromUtf8(Integer.toString(requestedPosition + 1));
-        }
-        response.complete(
-            StateResponse.newBuilder()
-                .setId(requestBuilder.getId())
-                .setGet(
-                    StateGetResponse.newBuilder()
-                        .setData(expected[requestedPosition])
-                        .setContinuationToken(newContinuationToken))
-                .build());
-      };
+    @Test
+    public void testPrefetchIgnoredWhenExistingPrefetchOngoing() throws Exception {
+      AtomicInteger callCount = new AtomicInteger();
+      BeamFnStateClient fakeStateClient =
+          new BeamFnStateClient() {
+            @Override
+            public void handle(
+                StateRequest.Builder requestBuilder, CompletableFuture<StateResponse> response) {
+              callCount.incrementAndGet();
+            }
+          };
+      PrefetchableIterator<ByteString> byteStrings =
+          new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest.getDefaultInstance());
+      assertEquals(0, callCount.get());
+      byteStrings.prefetch();
+      assertEquals(1, callCount.get()); // first prefetch
+      byteStrings.prefetch();
+      assertEquals(1, callCount.get()); // subsequent is ignored
     }
 
     private void testFetch(ByteString... expected) {
       AtomicInteger callCount = new AtomicInteger();
       BeamFnStateClient fakeStateClient = fakeStateClient(callCount, expected);
-      Iterator<ByteString> byteStrings =
+      PrefetchableIterator<ByteString> byteStrings =
           new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest.getDefaultInstance());
       assertEquals(0, callCount.get()); // Ensure it's fully lazy.
-      assertArrayEquals(expected, Iterators.toArray(byteStrings, Object.class));
+      assertFalse(byteStrings.isReady());
+
+      // Prefetch every second element in the iterator capturing the results
+      List<ByteString> results = new ArrayList<>();

Review comment:
       I think your referring to line 242 now and that should likely read:
   ```
   assertArrayEquals(expected, Iterables.toArray(results, Object.class));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15475:
URL: https://github.com/apache/beam/pull/15475#discussion_r762217256



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
##########
@@ -77,49 +121,55 @@ public void testMultiWithEmptyByteStrings() throws Exception {
           ByteString.EMPTY);
     }
 
-    private BeamFnStateClient fakeStateClient(AtomicInteger callCount, ByteString... expected) {
-      return (requestBuilder, response) -> {
-        callCount.incrementAndGet();
-        if (expected.length == 0) {
-          response.complete(
-              StateResponse.newBuilder()
-                  .setId(requestBuilder.getId())
-                  .setGet(StateGetResponse.newBuilder())
-                  .build());
-          return;
-        }
-
-        ByteString continuationToken = requestBuilder.getGet().getContinuationToken();
-
-        int requestedPosition = 0; // Default position is 0
-        if (!ByteString.EMPTY.equals(continuationToken)) {
-          requestedPosition = Integer.parseInt(continuationToken.toStringUtf8());
-        }
-
-        // Compute the new continuation token
-        ByteString newContinuationToken = ByteString.EMPTY;
-        if (requestedPosition != expected.length - 1) {
-          newContinuationToken = ByteString.copyFromUtf8(Integer.toString(requestedPosition + 1));
-        }
-        response.complete(
-            StateResponse.newBuilder()
-                .setId(requestBuilder.getId())
-                .setGet(
-                    StateGetResponse.newBuilder()
-                        .setData(expected[requestedPosition])
-                        .setContinuationToken(newContinuationToken))
-                .build());
-      };
+    @Test
+    public void testPrefetchIgnoredWhenExistingPrefetchOngoing() throws Exception {
+      AtomicInteger callCount = new AtomicInteger();
+      BeamFnStateClient fakeStateClient =
+          new BeamFnStateClient() {
+            @Override
+            public void handle(
+                StateRequest.Builder requestBuilder, CompletableFuture<StateResponse> response) {
+              callCount.incrementAndGet();
+            }
+          };
+      PrefetchableIterator<ByteString> byteStrings =
+          new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest.getDefaultInstance());
+      assertEquals(0, callCount.get());
+      byteStrings.prefetch();
+      assertEquals(1, callCount.get()); // first prefetch
+      byteStrings.prefetch();
+      assertEquals(1, callCount.get()); // subsequent is ignored
     }
 
     private void testFetch(ByteString... expected) {
       AtomicInteger callCount = new AtomicInteger();
       BeamFnStateClient fakeStateClient = fakeStateClient(callCount, expected);
-      Iterator<ByteString> byteStrings =
+      PrefetchableIterator<ByteString> byteStrings =
           new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest.getDefaultInstance());
       assertEquals(0, callCount.get()); // Ensure it's fully lazy.
-      assertArrayEquals(expected, Iterators.toArray(byteStrings, Object.class));
+      assertFalse(byteStrings.isReady());
+
+      // Prefetch every second element in the iterator capturing the results
+      List<ByteString> results = new ArrayList<>();

Review comment:
       I think that was a bug in the original test and now has been fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15475:
URL: https://github.com/apache/beam/pull/15475#issuecomment-916586573


   PTAL, the latest commit should have the diff


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15475:
URL: https://github.com/apache/beam/pull/15475#issuecomment-914876048


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org