You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/29 00:12:56 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

TheNeuralBit commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738850651



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -685,11 +715,27 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) {
           finishFunctionRegistry,
           resetFunctions::add,
           tearDownFunctions::add,
+          (apiServiceDescriptor, dataEndpoint) -> {
+            if (!bundleProcessor
+                .getInboundEndpointApiServiceDescriptors()
+                .contains(apiServiceDescriptor)) {
+              bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor);
+            }
+            bundleProcessor.getInboundDataEndpoints().add(dataEndpoint);
+          },
+          (timerEndpoint) -> {
+            if (!bundleDescriptor.hasTimerApiServiceDescriptor()) {
+              throw FailAllTimerRegistrations.fail(processBundleRequest);
+            }
+            bundleProcessor.getTimerEndpoints().add(timerEndpoint);
+          },
           progressRequestCallbacks::add,
           splitListener,
           bundleFinalizer,
           bundleProcessor.getChannelRoots());
     }
+    bundleProcessor.finish();

Review comment:
       This finish logic feels error-prone. Couldn't you create all the members first, and then construct the `BundleProcessor`, fully realized?

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java
##########
@@ -49,7 +49,10 @@
  *
  * <p>TODO: Add support for multiplexing over multiple outbound observers by stickying the output
  * location with a specific outbound observer.
+ *
+ * @deprecated Migrate to {@link BeamFnDataGrpcMultiplexer2}.

Review comment:
       Are there other usages of these classes that prevent us from going ahead and removing them?

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PTransformRunnerFactoryTestContext.java
##########
@@ -210,6 +222,34 @@ public void accept(T input) throws Exception {
     };
   }
 
+  public abstract Map<ApiServiceDescriptor, List<DataEndpoint<?>>> getIncomingDataEndpoints();
+
+  @Override
+  public <T> void addIncomingDataEndpoint(
+      ApiServiceDescriptor apiServiceDescriptor, Coder<T> coder, FnDataReceiver<T> receiver) {
+    getIncomingDataEndpoints()
+        .computeIfAbsent(apiServiceDescriptor, (unused) -> new ArrayList<>())
+        .add(DataEndpoint.create(getPTransformId(), coder, receiver));
+  }
+
+  public abstract List<TimerEndpoint<?>> getIncomingTimerEndpoints();
+
+  public <T> TimerEndpoint<T> getIncomingTimerEndpoint(String timerFamilyId) {
+    for (TimerEndpoint<?> timerEndpoint : getIncomingTimerEndpoints()) {
+      if (timerFamilyId.equals(timerEndpoint.getTimerFamilyId())) {
+        return (TimerEndpoint<T>) timerEndpoint;
+      }
+    }

Review comment:
       I guess this is just a test class, but why not store the timer endpoints in a map rather than iterating over them?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -858,7 +909,11 @@ public static BundleProcessor create(
 
     abstract HandleStateCallsForBundle getBeamFnStateClient();
 
-    abstract QueueingBeamFnDataClient getQueueingClient();
+    abstract List<Endpoints.ApiServiceDescriptor> getInboundEndpointApiServiceDescriptors();
+
+    abstract List<DataEndpoint<?>> getInboundDataEndpoints();
+
+    abstract List<TimerEndpoint<?>> getTimerEndpoints();

Review comment:
       Do these need to be mutable lists (i.e. will they be modified after creation), or are they just mutable because of the setup and `finish()` logic?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org