You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yi...@apache.org on 2021/10/19 15:59:17 UTC
[beam] branch master updated: [BEAM-13042] Prevent unexpected
blocking in RegisterAndProcessBundleOperation.hasFailed
This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 63d9c62 [BEAM-13042] Prevent unexpected blocking in RegisterAndProcessBundleOperation.hasFailed
new aa4edda Merge pull request #15716 from scwhittle/fix_unexpected_blocking
63d9c62 is described below
commit 63d9c621e186636980d11a942b15ce561ea0d06c
Author: Sam Whittle <sa...@google.com>
AuthorDate: Wed Oct 13 04:59:48 2021 -0700
[BEAM-13042] Prevent unexpected blocking in RegisterAndProcessBundleOperation.hasFailed
---
.../control/RegisterAndProcessBundleOperation.java | 13 +++--
.../RegisterAndProcessBundleOperationTest.java | 66 +++++++++++++++++++++-
2 files changed, 73 insertions(+), 6 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
index b49df39..19e73db 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
@@ -403,12 +403,15 @@ public class RegisterAndProcessBundleOperation extends Operation {
}
public boolean hasFailed() throws ExecutionException, InterruptedException {
- if (processBundleResponse != null && processBundleResponse.toCompletableFuture().isDone()) {
- return !processBundleResponse.toCompletableFuture().get().getError().isEmpty();
- } else {
- // At the very least, we don't know that this has failed yet.
- return false;
+ if (processBundleResponse != null) {
+ @Nullable
+ InstructionResponse response = processBundleResponse.toCompletableFuture().getNow(null);
+ if (response != null) {
+ return !response.getError().isEmpty();
+ }
}
+ // Either this has not failed yet, or has completed successfully.
+ return false;
}
/*
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
index 91e8257..2471f10 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
@@ -225,6 +225,7 @@ public class RegisterAndProcessBundleOperationTest {
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("555"))
.build());
operation.finish();
+ assertEquals(false, operation.hasFailed());
// Ensure on restart that we only send the process bundle request
operation.start();
@@ -236,6 +237,7 @@ public class RegisterAndProcessBundleOperationTest {
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("555"))
.build());
operation.finish();
+ assertEquals(false, operation.hasFailed());
}
@Test
@@ -261,7 +263,6 @@ public class RegisterAndProcessBundleOperationTest {
// Purposefully sleep simulating SDK harness doing work
Thread.sleep(100);
responseFuture.complete(responseFor(request).build());
- completeFuture(request, responseFuture);
return null;
});
return responseFuture;
@@ -283,6 +284,69 @@ public class RegisterAndProcessBundleOperationTest {
operation.start();
// This method blocks till the requests are completed
operation.finish();
+ assertEquals(false, operation.hasFailed());
+
+ // Ensure that the messages were received
+ assertEquals(
+ requests.get(0),
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setInstructionId("777")
+ .setRegister(REGISTER_REQUEST)
+ .build());
+ assertEquals(
+ requests.get(1),
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setInstructionId("778")
+ .setProcessBundle(
+ BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("555"))
+ .build());
+ }
+
+ @Test
+ public void testProcessingBundleBlocksOnFinishWithError() throws Exception {
+ List<BeamFnApi.InstructionRequest> requests = new ArrayList<>();
+ IdGenerator idGenerator = makeIdGeneratorStartingFrom(777L);
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ RegisterAndProcessBundleOperation operation =
+ new RegisterAndProcessBundleOperation(
+ idGenerator,
+ new TestInstructionRequestHandler() {
+ @Override
+ public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
+ requests.add(request);
+ switch (request.getRequestCase()) {
+ case REGISTER:
+ return CompletableFuture.completedFuture(responseFor(request).build());
+ case PROCESS_BUNDLE:
+ CompletableFuture<InstructionResponse> responseFuture =
+ new CompletableFuture<>();
+ executorService.submit(
+ () -> {
+ // Purposefully sleep simulating SDK harness doing work
+ Thread.sleep(100);
+ responseFuture.complete(responseFor(request).setError("error").build());
+ return null;
+ });
+ return responseFuture;
+ default:
+ // Anything else hangs; nothing else should be blocking
+ return new CompletableFuture<>();
+ }
+ }
+ },
+ mockBeamFnStateDelegator,
+ REGISTER_REQUEST,
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ ImmutableTable.of(),
+ ImmutableMap.of(),
+ mockContext);
+
+ operation.start();
+ // This method blocks till the requests are completed
+ operation.finish();
+ assertEquals(true, operation.hasFailed());
// Ensure that the messages were received
assertEquals(