You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/29 04:07:41 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

lukecwik commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738918901



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -685,11 +715,27 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) {
           finishFunctionRegistry,
           resetFunctions::add,
           tearDownFunctions::add,
+          (apiServiceDescriptor, dataEndpoint) -> {
+            if (!bundleProcessor
+                .getInboundEndpointApiServiceDescriptors()
+                .contains(apiServiceDescriptor)) {
+              bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor);
+            }
+            bundleProcessor.getInboundDataEndpoints().add(dataEndpoint);
+          },
+          (timerEndpoint) -> {
+            if (!bundleDescriptor.hasTimerApiServiceDescriptor()) {
+              throw FailAllTimerRegistrations.fail(processBundleRequest);
+            }
+            bundleProcessor.getTimerEndpoints().add(timerEndpoint);
+          },
           progressRequestCallbacks::add,
           splitListener,
           bundleFinalizer,
           bundleProcessor.getChannelRoots());
     }
+    bundleProcessor.finish();

Review comment:
       I didn't want to take on a mostly unrelated refactoring of a big chunk of the ProcessBundleHandler within this PR as I had already done one refactoring to migrate the PTransformRunnerFactory to use a Context object instead of passing in all the arguments.
   
   The BundleProcessor::getInstructionId is passed in to the PTransformRunnerFactory context so it needs to exist beforehand but the BeamFnDataInboundObserver2 can only be constructed once all the endpoints have been registered which is why a lot of the lists and internal objects have certain mutable aspects which aren't really needed to be mutable after all the PTransform runners are created.
   
   Overall the internal workings of the ProcessBundleHandler code related to creating the associated PTransforms and  BundleProcessor likely need to move to a builder pattern.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -858,7 +909,11 @@ public static BundleProcessor create(
 
     abstract HandleStateCallsForBundle getBeamFnStateClient();
 
-    abstract QueueingBeamFnDataClient getQueueingClient();
+    abstract List<Endpoints.ApiServiceDescriptor> getInboundEndpointApiServiceDescriptors();
+
+    abstract List<DataEndpoint<?>> getInboundDataEndpoints();
+
+    abstract List<TimerEndpoint<?>> getTimerEndpoints();

Review comment:
       Yes, mutable because of `setup` and `finish` and overdue for a refactoring.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org