You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/03 20:12:47 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #15475: [BEAM-12802] Add support for prefetch through data layers down through the State API building on previous commits.

TheNeuralBit commented on a change in pull request #15475:
URL: https://github.com/apache/beam/pull/15475#discussion_r762215539



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
##########
@@ -77,49 +121,55 @@ public void testMultiWithEmptyByteStrings() throws Exception {
           ByteString.EMPTY);
     }
 
-    private BeamFnStateClient fakeStateClient(AtomicInteger callCount, ByteString... expected) {
-      return (requestBuilder, response) -> {
-        callCount.incrementAndGet();
-        if (expected.length == 0) {
-          response.complete(
-              StateResponse.newBuilder()
-                  .setId(requestBuilder.getId())
-                  .setGet(StateGetResponse.newBuilder())
-                  .build());
-          return;
-        }
-
-        ByteString continuationToken = requestBuilder.getGet().getContinuationToken();
-
-        int requestedPosition = 0; // Default position is 0
-        if (!ByteString.EMPTY.equals(continuationToken)) {
-          requestedPosition = Integer.parseInt(continuationToken.toStringUtf8());
-        }
-
-        // Compute the new continuation token
-        ByteString newContinuationToken = ByteString.EMPTY;
-        if (requestedPosition != expected.length - 1) {
-          newContinuationToken = ByteString.copyFromUtf8(Integer.toString(requestedPosition + 1));
-        }
-        response.complete(
-            StateResponse.newBuilder()
-                .setId(requestBuilder.getId())
-                .setGet(
-                    StateGetResponse.newBuilder()
-                        .setData(expected[requestedPosition])
-                        .setContinuationToken(newContinuationToken))
-                .build());
-      };
+    @Test
+    public void testPrefetchIgnoredWhenExistingPrefetchOngoing() throws Exception {
+      AtomicInteger callCount = new AtomicInteger();
+      BeamFnStateClient fakeStateClient =
+          new BeamFnStateClient() {
+            @Override
+            public void handle(
+                StateRequest.Builder requestBuilder, CompletableFuture<StateResponse> response) {
+              callCount.incrementAndGet();
+            }
+          };
+      PrefetchableIterator<ByteString> byteStrings =
+          new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest.getDefaultInstance());
+      assertEquals(0, callCount.get());
+      byteStrings.prefetch();
+      assertEquals(1, callCount.get()); // first prefetch
+      byteStrings.prefetch();
+      assertEquals(1, callCount.get()); // subsequent is ignored
     }
 
     private void testFetch(ByteString... expected) {
       AtomicInteger callCount = new AtomicInteger();
       BeamFnStateClient fakeStateClient = fakeStateClient(callCount, expected);
-      Iterator<ByteString> byteStrings =
+      PrefetchableIterator<ByteString> byteStrings =
           new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest.getDefaultInstance());
       assertEquals(0, callCount.get()); // Ensure it's fully lazy.
-      assertArrayEquals(expected, Iterators.toArray(byteStrings, Object.class));
+      assertFalse(byteStrings.isReady());
+
+      // Prefetch every second element in the iterator capturing the results
+      List<ByteString> results = new ArrayList<>();

Review comment:
       @lukecwik did you intend to make an assertion on this?
   
   error-prone checks in https://github.com/apache/beam/pull/15890 identify it as unused.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org