You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/26 23:38:52 UTC

[GitHub] [beam] reuvenlax commented on a diff in pull request #22461: Fixes #22438. Ensure that WindmillStateReader completes all batched read futures on exceptions

reuvenlax commented on code in PR #22461:
URL: https://github.com/apache/beam/pull/22461#discussion_r930496722


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java:
##########
@@ -453,30 +453,40 @@ private <ResultT, ContinuationT> Future<Iterable<ResultT>> valuesToPagingIterabl
   public void startBatchAndBlock() {
     // First, drain work out of the pending lookups into a set. These will be the items we fetch.
     HashSet<StateTag<?>> toFetch = Sets.newHashSet();
-    while (!pendingLookups.isEmpty()) {
-      StateTag<?> stateTag = pendingLookups.poll();
-      if (stateTag == null) {
-        break;
+    try {
+      while (!pendingLookups.isEmpty()) {
+        StateTag<?> stateTag = pendingLookups.poll();
+        if (stateTag == null) {
+          break;
+        }
+
+        if (!toFetch.add(stateTag)) {
+          throw new IllegalStateException("Duplicate tags being fetched.");
+        }
       }
 
-      if (!toFetch.add(stateTag)) {
-        throw new IllegalStateException("Duplicate tags being fetched.");
+      // If we failed to drain anything, some other thread pulled it off the queue. We have no work
+      // to do.
+      if (toFetch.isEmpty()) {
+        return;
       }
-    }
 
-    // If we failed to drain anything, some other thread pulled it off the queue. We have no work
-    // to do.
-    if (toFetch.isEmpty()) {
-      return;
-    }
+      Windmill.KeyedGetDataRequest request = createRequest(toFetch);
+      Windmill.KeyedGetDataResponse response = server.getStateData(computation, request);
+      if (response == null) {
+        throw new RuntimeException("Windmill unexpectedly returned null for request " + request);
+      }
 
-    Windmill.KeyedGetDataRequest request = createRequest(toFetch);
-    Windmill.KeyedGetDataResponse response = server.getStateData(computation, request);
-    if (response == null) {
-      throw new RuntimeException("Windmill unexpectedly returned null for request " + request);
+      // Removes tags from toFetch as they are processed.
+      consumeResponse(response, toFetch);
+    } catch (RuntimeException e) {

Review Comment:
   Why are you catching RuntimeException and not Exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org