You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yi...@apache.org on 2021/10/19 15:59:17 UTC

[beam] branch master updated: [BEAM-13042] Prevent unexpected blocking in RegisterAndProcessBundleOperation.hasFailed

This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 63d9c62  [BEAM-13042] Prevent unexpected blocking in RegisterAndProcessBundleOperation.hasFailed
     new aa4edda  Merge pull request #15716 from scwhittle/fix_unexpected_blocking
63d9c62 is described below

commit 63d9c621e186636980d11a942b15ce561ea0d06c
Author: Sam Whittle <sa...@google.com>
AuthorDate: Wed Oct 13 04:59:48 2021 -0700

    [BEAM-13042] Prevent unexpected blocking in RegisterAndProcessBundleOperation.hasFailed
---
 .../control/RegisterAndProcessBundleOperation.java | 13 +++--
 .../RegisterAndProcessBundleOperationTest.java     | 66 +++++++++++++++++++++-
 2 files changed, 73 insertions(+), 6 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
index b49df39..19e73db 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
@@ -403,12 +403,15 @@ public class RegisterAndProcessBundleOperation extends Operation {
   }
 
   public boolean hasFailed() throws ExecutionException, InterruptedException {
-    if (processBundleResponse != null && processBundleResponse.toCompletableFuture().isDone()) {
-      return !processBundleResponse.toCompletableFuture().get().getError().isEmpty();
-    } else {
-      // At the very least, we don't know that this has failed yet.
-      return false;
+    if (processBundleResponse != null) {
+      @Nullable
+      InstructionResponse response = processBundleResponse.toCompletableFuture().getNow(null);
+      if (response != null) {
+        return !response.getError().isEmpty();
+      }
     }
+    // Either this has not failed yet, or has completed successfully.
+    return false;
   }
 
   /*
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
index 91e8257..2471f10 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
@@ -225,6 +225,7 @@ public class RegisterAndProcessBundleOperationTest {
                 BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("555"))
             .build());
     operation.finish();
+    assertEquals(false, operation.hasFailed());
 
     // Ensure on restart that we only send the process bundle request
     operation.start();
@@ -236,6 +237,7 @@ public class RegisterAndProcessBundleOperationTest {
                 BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("555"))
             .build());
     operation.finish();
+    assertEquals(false, operation.hasFailed());
   }
 
   @Test
@@ -261,7 +263,6 @@ public class RegisterAndProcessBundleOperationTest {
                           // Purposefully sleep simulating SDK harness doing work
                           Thread.sleep(100);
                           responseFuture.complete(responseFor(request).build());
-                          completeFuture(request, responseFuture);
                           return null;
                         });
                     return responseFuture;
@@ -283,6 +284,69 @@ public class RegisterAndProcessBundleOperationTest {
     operation.start();
     // This method blocks till the requests are completed
     operation.finish();
+    assertEquals(false, operation.hasFailed());
+
+    // Ensure that the messages were received
+    assertEquals(
+        requests.get(0),
+        BeamFnApi.InstructionRequest.newBuilder()
+            .setInstructionId("777")
+            .setRegister(REGISTER_REQUEST)
+            .build());
+    assertEquals(
+        requests.get(1),
+        BeamFnApi.InstructionRequest.newBuilder()
+            .setInstructionId("778")
+            .setProcessBundle(
+                BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("555"))
+            .build());
+  }
+
+  @Test
+  public void testProcessingBundleBlocksOnFinishWithError() throws Exception {
+    List<BeamFnApi.InstructionRequest> requests = new ArrayList<>();
+    IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
+    ExecutorService executorService = Executors.newCachedThreadPool();
+    RegisterAndProcessBundleOperation operation =
+        new RegisterAndProcessBundleOperation(
+            idGenerator,
+            new TestInstructionRequestHandler() {
+              @Override
+              public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
+                requests.add(request);
+                switch (request.getRequestCase()) {
+                  case REGISTER:
+                    return CompletableFuture.completedFuture(responseFor(request).build());
+                  case PROCESS_BUNDLE:
+                    CompletableFuture<InstructionResponse> responseFuture =
+                        new CompletableFuture<>();
+                    executorService.submit(
+                        () -> {
+                          // Purposefully sleep simulating SDK harness doing work
+                          Thread.sleep(100);
+                          responseFuture.complete(responseFor(request).setError("error").build());
+                          return null;
+                        });
+                    return responseFuture;
+                  default:
+                    // Anything else hangs; nothing else should be blocking
+                    return new CompletableFuture<>();
+                }
+              }
+            },
+            mockBeamFnStateDelegator,
+            REGISTER_REQUEST,
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableTable.of(),
+            ImmutableMap.of(),
+            mockContext);
+
+    operation.start();
+    // This method blocks till the requests are completed
+    operation.finish();
+    assertEquals(true, operation.hasFailed());
 
     // Ensure that the messages were received
     assertEquals(