You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/24 00:39:59 UTC

[GitHub] [beam] y1chi commented on a change in pull request #16057: [BEAM-13015] Simplify StateFetchingIterators.LazyBlockingStateFetchingIterator

y1chi commented on a change in pull request #16057:
URL: https://github.com/apache/beam/pull/16057#discussion_r755611072



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
##########
@@ -265,48 +258,39 @@ public void prefetch() {
 
     @Override
     public boolean hasNext() {
-      switch (currentState) {
-        case EOF:
-          return false;
-        case READ_REQUIRED:
-          prefetch();
-          StateResponse stateResponse;
-          try {
-            stateResponse = prefetchedResponse.get();
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IllegalStateException(e);
-          } catch (ExecutionException e) {
-            if (e.getCause() == null) {
-              throw new IllegalStateException(e);
-            }
-            Throwables.throwIfUnchecked(e.getCause());
-            throw new IllegalStateException(e.getCause());
-          }
-          prefetchedResponse = null;
-          continuationToken = stateResponse.getGet().getContinuationToken();
-          next = stateResponse.getGet().getData();
-          currentState = State.HAS_NEXT;
-          return true;
-        case HAS_NEXT:
-          return true;
-      }
-      throw new IllegalStateException(String.format("Unknown state %s", currentState));
+      return moreToRead;
     }
 
     @Override
     public ByteString next() {
       if (!hasNext()) {
         throw new NoSuchElementException();
       }
+
+      prefetch();
+      StateResponse stateResponse;
+      try {
+        stateResponse = prefetchedResponse.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(e);
+      } catch (ExecutionException e) {
+        if (e.getCause() == null) {
+          throw new IllegalStateException(e);
+        }
+        Throwables.throwIfUnchecked(e.getCause());
+        throw new IllegalStateException(e.getCause());
+      }
+      prefetchedResponse = null;
+      continuationToken = stateResponse.getGet().getContinuationToken();
+
       // If the continuation token is empty, that means we have reached EOF.
       if (ByteString.EMPTY.equals(continuationToken)) {
-        currentState = State.EOF;
+        moreToRead = false;
       } else {
-        currentState = State.READ_REQUIRED;
         prefetch();

Review comment:
       do we still need this prefetch?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org