You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/29 00:48:56 UTC

[GitHub] [beam] y1chi commented on a change in pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

y1chi commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738858366



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -776,6 +790,142 @@ public void testPTransformStartExceptionsArePropagated() {
     assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), empty());
   }
 
+  @Test
+  public void testInstructionIsUnregisteredFromBeamFnDataClientOnSuccess() throws Exception {
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        BeamFnApi.ProcessBundleDescriptor.newBuilder()
+            .putTransforms(
+                "2L",
+                RunnerApi.PTransform.newBuilder()
+                    .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                    .build())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+
+    Mockito.doAnswer(
+            (invocation) -> {
+              String instructionId = invocation.getArgument(0, String.class);
+              CloseableFnDataReceiver<BeamFnApi.Elements> data =
+                  invocation.getArgument(2, CloseableFnDataReceiver.class);
+              data.accept(
+                  BeamFnApi.Elements.newBuilder()
+                      .addData(
+                          BeamFnApi.Elements.Data.newBuilder()
+                              .setInstructionId(instructionId)
+                              .setTransformId("2L")
+                              .setIsLast(true))
+                      .build());
+              return null;
+            })
+        .when(beamFnDataClient)
+        .registerReceiver(any(), any(), any());
+
+    ProcessBundleHandler handler =
+        new ProcessBundleHandler(
+            PipelineOptionsFactory.create(),
+            Collections.emptySet(),
+            fnApiRegistry::get,
+            beamFnDataClient,
+            null /* beamFnStateGrpcClientCache */,
+            null /* finalizeBundleHandler */,
+            new ShortIdMap(),
+            ImmutableMap.of(
+                DATA_INPUT_URN,
+                (PTransformRunnerFactory<Object>)
+                    (context) -> {
+                      context.addIncomingDataEndpoint(
+                          ApiServiceDescriptor.getDefaultInstance(),
+                          StringUtf8Coder.of(),
+                          (input) -> {});
+                      return null;
+                    }),
+            new BundleProcessorCache());
+    handler.processBundle(
+        BeamFnApi.InstructionRequest.newBuilder()
+            .setInstructionId("instructionId")
+            .setProcessBundle(
+                BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
+            .build());
+
+    // Ensure that we unregister during successful processing
+    verify(beamFnDataClient).registerReceiver(eq("instructionId"), any(), any());
+    verify(beamFnDataClient).unregisterReceiver(eq("instructionId"), any());
+    verifyNoMoreInteractions(beamFnDataClient);
+  }
+
+  @Test
+  public void testDataProcessingExceptionsArePropagated() throws Exception {
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        BeamFnApi.ProcessBundleDescriptor.newBuilder()
+            .putTransforms(
+                "2L",
+                RunnerApi.PTransform.newBuilder()
+                    .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                    .build())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+
+    Mockito.doAnswer(
+            (invocation) -> {
+              ByteString.Output encodedData = ByteString.newOutput();
+              StringUtf8Coder.of().encode("A", encodedData);
+              String instructionId = invocation.getArgument(0, String.class);
+              CloseableFnDataReceiver<BeamFnApi.Elements> data =
+                  invocation.getArgument(2, CloseableFnDataReceiver.class);
+              data.accept(
+                  BeamFnApi.Elements.newBuilder()
+                      .addData(
+                          BeamFnApi.Elements.Data.newBuilder()
+                              .setInstructionId(instructionId)
+                              .setTransformId("2L")
+                              .setData(encodedData.toByteString())
+                              .setIsLast(true))
+                      .build());
+
+              return null;
+            })
+        .when(beamFnDataClient)
+        .registerReceiver(any(), any(), any());
+
+    ProcessBundleHandler handler =
+        new ProcessBundleHandler(
+            PipelineOptionsFactory.create(),
+            Collections.emptySet(),
+            fnApiRegistry::get,
+            beamFnDataClient,
+            null /* beamFnStateGrpcClientCache */,
+            null /* finalizeBundleHandler */,
+            new ShortIdMap(),
+            ImmutableMap.of(
+                DATA_INPUT_URN,
+                (PTransformRunnerFactory<Object>)
+                    (context) -> {
+                      context.addIncomingDataEndpoint(
+                          ApiServiceDescriptor.getDefaultInstance(),
+                          StringUtf8Coder.of(),
+                          (input) -> {
+                            throw new IllegalStateException("TestException");
+                          });
+                      return null;
+                    }),
+            new BundleProcessorCache());
+    assertThrows(
+        "TestException",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                BeamFnApi.InstructionRequest.newBuilder()
+                    .setInstructionId("instructionId")
+                    .setProcessBundle(
+                        BeamFnApi.ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L"))
+                    .build()));
+
+    // Ensure that we unregister during successful processing

Review comment:
       exsure we don't unregister when there's exception?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
##########
@@ -34,28 +33,36 @@
  */
 public interface BeamFnDataClient {
   /**
-   * Registers the following inbound receiver for the provided instruction id and target.
+   * Registers the following inbound receiver for the provided instruction id.
    *
    * <p>The provided coder is used to decode inbound elements. The decoded elements are passed to
    * the provided receiver. Any failure during decoding or processing of the element will complete
    * the returned future exceptionally. On successful termination of the stream, the returned future
    * is completed successfully.
    *
    * <p>The receiver is not required to be thread safe.
+   *
+   * <p>Receivers for successfully processed bundles must be unregistered. See {@link
+   * #unregisterReceiver} for details.

Review comment:
       Do we need to expose both register and unregister function, why not have receive function handles register, wait for completion or exception, unregister internally?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org