You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/26 22:25:56 UTC

[GitHub] [beam] lukecwik opened a new pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

lukecwik opened a new pull request #15807:
URL: https://github.com/apache/beam/pull/15807


   I didn't want to add the complexity of a BundleProcessor class clean-up and ProcessBundleHandler clean-up to the implementation so I used the existing constructs as best as I could.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #15807:
URL: https://github.com/apache/beam/pull/15807


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738921411



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -776,6 +790,142 @@ public void testPTransformStartExceptionsArePropagated() {
     assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), empty());
   }
 
+  @Test
+  public void testInstructionIsUnregisteredFromBeamFnDataClientOnSuccess() throws Exception {
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        BeamFnApi.ProcessBundleDescriptor.newBuilder()
+            .putTransforms(
+                "2L",
+                RunnerApi.PTransform.newBuilder()
+                    .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                    .build())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+
+    Mockito.doAnswer(
+            (invocation) -> {
+              String instructionId = invocation.getArgument(0, String.class);
+              CloseableFnDataReceiver<BeamFnApi.Elements> data =
+                  invocation.getArgument(2, CloseableFnDataReceiver.class);
+              data.accept(
+                  BeamFnApi.Elements.newBuilder()
+                      .addData(
+                          BeamFnApi.Elements.Data.newBuilder()
+                              .setInstructionId(instructionId)
+                              .setTransformId("2L")
+                              .setIsLast(true))
+                      .build());
+              return null;
+            })
+        .when(beamFnDataClient)
+        .registerReceiver(any(), any(), any());
+
+    ProcessBundleHandler handler =
+        new ProcessBundleHandler(
+            PipelineOptionsFactory.create(),
+            Collections.emptySet(),
+            fnApiRegistry::get,
+            beamFnDataClient,
+            null /* beamFnStateGrpcClientCache */,
+            null /* finalizeBundleHandler */,
+            new ShortIdMap(),
+            ImmutableMap.of(
+                DATA_INPUT_URN,
+                (PTransformRunnerFactory<Object>)
+                    (context) -> {
+                      context.addIncomingDataEndpoint(
+                          ApiServiceDescriptor.getDefaultInstance(),
+                          StringUtf8Coder.of(),
+                          (input) -> {});
+                      return null;
+                    }),
+            new BundleProcessorCache());
+    handler.processBundle(
+        BeamFnApi.InstructionRequest.newBuilder()
+            .setInstructionId("instructionId")
+            .setProcessBundle(
+                BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
+            .build());
+
+    // Ensure that we unregister during successful processing
+    verify(beamFnDataClient).registerReceiver(eq("instructionId"), any(), any());
+    verify(beamFnDataClient).unregisterReceiver(eq("instructionId"), any());
+    verifyNoMoreInteractions(beamFnDataClient);
+  }
+
+  @Test
+  public void testDataProcessingExceptionsArePropagated() throws Exception {
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        BeamFnApi.ProcessBundleDescriptor.newBuilder()
+            .putTransforms(
+                "2L",
+                RunnerApi.PTransform.newBuilder()
+                    .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                    .build())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+
+    Mockito.doAnswer(
+            (invocation) -> {
+              ByteString.Output encodedData = ByteString.newOutput();
+              StringUtf8Coder.of().encode("A", encodedData);
+              String instructionId = invocation.getArgument(0, String.class);
+              CloseableFnDataReceiver<BeamFnApi.Elements> data =
+                  invocation.getArgument(2, CloseableFnDataReceiver.class);
+              data.accept(
+                  BeamFnApi.Elements.newBuilder()
+                      .addData(
+                          BeamFnApi.Elements.Data.newBuilder()
+                              .setInstructionId(instructionId)
+                              .setTransformId("2L")
+                              .setData(encodedData.toByteString())
+                              .setIsLast(true))
+                      .build());
+
+              return null;
+            })
+        .when(beamFnDataClient)
+        .registerReceiver(any(), any(), any());
+
+    ProcessBundleHandler handler =
+        new ProcessBundleHandler(
+            PipelineOptionsFactory.create(),
+            Collections.emptySet(),
+            fnApiRegistry::get,
+            beamFnDataClient,
+            null /* beamFnStateGrpcClientCache */,
+            null /* finalizeBundleHandler */,
+            new ShortIdMap(),
+            ImmutableMap.of(
+                DATA_INPUT_URN,
+                (PTransformRunnerFactory<Object>)
+                    (context) -> {
+                      context.addIncomingDataEndpoint(
+                          ApiServiceDescriptor.getDefaultInstance(),
+                          StringUtf8Coder.of(),
+                          (input) -> {
+                            throw new IllegalStateException("TestException");
+                          });
+                      return null;
+                    }),
+            new BundleProcessorCache());
+    assertThrows(
+        "TestException",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                BeamFnApi.InstructionRequest.newBuilder()
+                    .setInstructionId("instructionId")
+                    .setProcessBundle(
+                        BeamFnApi.ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L"))
+                    .build()));
+
+    // Ensure that we unregister during successful processing

Review comment:
       We don't want to unregister during bundle processing failure because that will remove the consumer for that instruction id which would cause stuckness since the runner could still be sending us data (for example it could be in a network buffer).
   
   We rely on the `BeamFnDataInboundObserver2#awaitCompletion` to close its internal queue if an exception during processing in the bundle process thread is detected (before being rethrown and causing bundle processing to fail) which will make the failure visible to the gRPC read thread if it ever enqueues anything. If that becomes visible to the gRPC read thread it can mark the instruction id as bad. Similarly if the gRPC read thread fails for some reason or is closed, the queue is closed allowing bundle processing threads to stop waiting and fail with an exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738922064



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
##########
@@ -34,28 +33,36 @@
  */
 public interface BeamFnDataClient {
   /**
-   * Registers the following inbound receiver for the provided instruction id and target.
+   * Registers the following inbound receiver for the provided instruction id.
    *
    * <p>The provided coder is used to decode inbound elements. The decoded elements are passed to
    * the provided receiver. Any failure during decoding or processing of the element will complete
    * the returned future exceptionally. On successful termination of the stream, the returned future
    * is completed successfully.
    *
    * <p>The receiver is not required to be thread safe.
+   *
+   * <p>Receivers for successfully processed bundles must be unregistered. See {@link
+   * #unregisterReceiver} for details.

Review comment:
       I wanted to make the caller of register responsible for the clean-up on success, putting it in another object seemed to hide how the code worked.
   
   On a minor note there was a minor reduction in how much state we needed to mutate and additional objects we needed to create and track related to having some kind of "unregister" handle.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
apilloud commented on pull request #15807:
URL: https://github.com/apache/beam/pull/15807#issuecomment-957013117


   It is possible this broke the `Run Java Spark PortableValidatesRunner Batch` test. After this change the test is reliably timing out (BEAM-13164). I have #15861 to up the timeout, but it looks like tests may be failing as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738858366



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -776,6 +790,142 @@ public void testPTransformStartExceptionsArePropagated() {
     assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), empty());
   }
 
+  @Test
+  public void testInstructionIsUnregisteredFromBeamFnDataClientOnSuccess() throws Exception {
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        BeamFnApi.ProcessBundleDescriptor.newBuilder()
+            .putTransforms(
+                "2L",
+                RunnerApi.PTransform.newBuilder()
+                    .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                    .build())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+
+    Mockito.doAnswer(
+            (invocation) -> {
+              String instructionId = invocation.getArgument(0, String.class);
+              CloseableFnDataReceiver<BeamFnApi.Elements> data =
+                  invocation.getArgument(2, CloseableFnDataReceiver.class);
+              data.accept(
+                  BeamFnApi.Elements.newBuilder()
+                      .addData(
+                          BeamFnApi.Elements.Data.newBuilder()
+                              .setInstructionId(instructionId)
+                              .setTransformId("2L")
+                              .setIsLast(true))
+                      .build());
+              return null;
+            })
+        .when(beamFnDataClient)
+        .registerReceiver(any(), any(), any());
+
+    ProcessBundleHandler handler =
+        new ProcessBundleHandler(
+            PipelineOptionsFactory.create(),
+            Collections.emptySet(),
+            fnApiRegistry::get,
+            beamFnDataClient,
+            null /* beamFnStateGrpcClientCache */,
+            null /* finalizeBundleHandler */,
+            new ShortIdMap(),
+            ImmutableMap.of(
+                DATA_INPUT_URN,
+                (PTransformRunnerFactory<Object>)
+                    (context) -> {
+                      context.addIncomingDataEndpoint(
+                          ApiServiceDescriptor.getDefaultInstance(),
+                          StringUtf8Coder.of(),
+                          (input) -> {});
+                      return null;
+                    }),
+            new BundleProcessorCache());
+    handler.processBundle(
+        BeamFnApi.InstructionRequest.newBuilder()
+            .setInstructionId("instructionId")
+            .setProcessBundle(
+                BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
+            .build());
+
+    // Ensure that we unregister during successful processing
+    verify(beamFnDataClient).registerReceiver(eq("instructionId"), any(), any());
+    verify(beamFnDataClient).unregisterReceiver(eq("instructionId"), any());
+    verifyNoMoreInteractions(beamFnDataClient);
+  }
+
+  @Test
+  public void testDataProcessingExceptionsArePropagated() throws Exception {
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        BeamFnApi.ProcessBundleDescriptor.newBuilder()
+            .putTransforms(
+                "2L",
+                RunnerApi.PTransform.newBuilder()
+                    .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                    .build())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+
+    Mockito.doAnswer(
+            (invocation) -> {
+              ByteString.Output encodedData = ByteString.newOutput();
+              StringUtf8Coder.of().encode("A", encodedData);
+              String instructionId = invocation.getArgument(0, String.class);
+              CloseableFnDataReceiver<BeamFnApi.Elements> data =
+                  invocation.getArgument(2, CloseableFnDataReceiver.class);
+              data.accept(
+                  BeamFnApi.Elements.newBuilder()
+                      .addData(
+                          BeamFnApi.Elements.Data.newBuilder()
+                              .setInstructionId(instructionId)
+                              .setTransformId("2L")
+                              .setData(encodedData.toByteString())
+                              .setIsLast(true))
+                      .build());
+
+              return null;
+            })
+        .when(beamFnDataClient)
+        .registerReceiver(any(), any(), any());
+
+    ProcessBundleHandler handler =
+        new ProcessBundleHandler(
+            PipelineOptionsFactory.create(),
+            Collections.emptySet(),
+            fnApiRegistry::get,
+            beamFnDataClient,
+            null /* beamFnStateGrpcClientCache */,
+            null /* finalizeBundleHandler */,
+            new ShortIdMap(),
+            ImmutableMap.of(
+                DATA_INPUT_URN,
+                (PTransformRunnerFactory<Object>)
+                    (context) -> {
+                      context.addIncomingDataEndpoint(
+                          ApiServiceDescriptor.getDefaultInstance(),
+                          StringUtf8Coder.of(),
+                          (input) -> {
+                            throw new IllegalStateException("TestException");
+                          });
+                      return null;
+                    }),
+            new BundleProcessorCache());
+    assertThrows(
+        "TestException",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                BeamFnApi.InstructionRequest.newBuilder()
+                    .setInstructionId("instructionId")
+                    .setProcessBundle(
+                        BeamFnApi.ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L"))
+                    .build()));
+
+    // Ensure that we unregister during successful processing

Review comment:
       exsure we don't unregister when there's exception?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
##########
@@ -34,28 +33,36 @@
  */
 public interface BeamFnDataClient {
   /**
-   * Registers the following inbound receiver for the provided instruction id and target.
+   * Registers the following inbound receiver for the provided instruction id.
    *
    * <p>The provided coder is used to decode inbound elements. The decoded elements are passed to
    * the provided receiver. Any failure during decoding or processing of the element will complete
    * the returned future exceptionally. On successful termination of the stream, the returned future
    * is completed successfully.
    *
    * <p>The receiver is not required to be thread safe.
+   *
+   * <p>Receivers for successfully processed bundles must be unregistered. See {@link
+   * #unregisterReceiver} for details.

Review comment:
       Do we need to expose both register and unregister function, why not have receive function handles register, wait for completion or exception, unregister internally?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik removed a comment on pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on pull request #15807:
URL: https://github.com/apache/beam/pull/15807#issuecomment-953075612






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15807:
URL: https://github.com/apache/beam/pull/15807#issuecomment-952375823






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738919562



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PTransformRunnerFactoryTestContext.java
##########
@@ -210,6 +222,34 @@ public void accept(T input) throws Exception {
     };
   }
 
+  public abstract Map<ApiServiceDescriptor, List<DataEndpoint<?>>> getIncomingDataEndpoints();
+
+  @Override
+  public <T> void addIncomingDataEndpoint(
+      ApiServiceDescriptor apiServiceDescriptor, Coder<T> coder, FnDataReceiver<T> receiver) {
+    getIncomingDataEndpoints()
+        .computeIfAbsent(apiServiceDescriptor, (unused) -> new ArrayList<>())
+        .add(DataEndpoint.create(getPTransformId(), coder, receiver));
+  }
+
+  public abstract List<TimerEndpoint<?>> getIncomingTimerEndpoints();
+
+  public <T> TimerEndpoint<T> getIncomingTimerEndpoint(String timerFamilyId) {
+    for (TimerEndpoint<?> timerEndpoint : getIncomingTimerEndpoints()) {
+      if (timerFamilyId.equals(timerEndpoint.getTimerFamilyId())) {
+        return (TimerEndpoint<T>) timerEndpoint;
+      }
+    }

Review comment:
       It was easy.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738918901



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -685,11 +715,27 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) {
           finishFunctionRegistry,
           resetFunctions::add,
           tearDownFunctions::add,
+          (apiServiceDescriptor, dataEndpoint) -> {
+            if (!bundleProcessor
+                .getInboundEndpointApiServiceDescriptors()
+                .contains(apiServiceDescriptor)) {
+              bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor);
+            }
+            bundleProcessor.getInboundDataEndpoints().add(dataEndpoint);
+          },
+          (timerEndpoint) -> {
+            if (!bundleDescriptor.hasTimerApiServiceDescriptor()) {
+              throw FailAllTimerRegistrations.fail(processBundleRequest);
+            }
+            bundleProcessor.getTimerEndpoints().add(timerEndpoint);
+          },
           progressRequestCallbacks::add,
           splitListener,
           bundleFinalizer,
           bundleProcessor.getChannelRoots());
     }
+    bundleProcessor.finish();

Review comment:
       I didn't want to take on a mostly unrelated refactoring of a big chunk of the ProcessBundleHandler within this PR as I had already done one refactoring to migrate the PTransformRunnerFactory to use a Context object instead of passing in all the arguments.
   
   The BundleProcessor::getInstructionId is passed in to the PTransformRunnerFactory context so it needs to exist beforehand but the BeamFnDataInboundObserver2 can only be constructed once all the endpoints have been registered which is why a lot of the lists and internal objects have certain mutable aspects which aren't really needed to be mutable after all the PTransform runners are created.
   
   Overall the internal workings of the ProcessBundleHandler code related to creating the associated PTransforms and  BundleProcessor likely need to move to a builder pattern.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -858,7 +909,11 @@ public static BundleProcessor create(
 
     abstract HandleStateCallsForBundle getBeamFnStateClient();
 
-    abstract QueueingBeamFnDataClient getQueueingClient();
+    abstract List<Endpoints.ApiServiceDescriptor> getInboundEndpointApiServiceDescriptors();
+
+    abstract List<DataEndpoint<?>> getInboundDataEndpoints();
+
+    abstract List<TimerEndpoint<?>> getTimerEndpoints();

Review comment:
       Yes, mutable because of `setup` and `finish` and overdue for a refactoring.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15807:
URL: https://github.com/apache/beam/pull/15807#issuecomment-959746130






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15807:
URL: https://github.com/apache/beam/pull/15807#issuecomment-952376265


   R: @TheNeuralBit @y1chi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738918901



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -685,11 +715,27 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) {
           finishFunctionRegistry,
           resetFunctions::add,
           tearDownFunctions::add,
+          (apiServiceDescriptor, dataEndpoint) -> {
+            if (!bundleProcessor
+                .getInboundEndpointApiServiceDescriptors()
+                .contains(apiServiceDescriptor)) {
+              bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor);
+            }
+            bundleProcessor.getInboundDataEndpoints().add(dataEndpoint);
+          },
+          (timerEndpoint) -> {
+            if (!bundleDescriptor.hasTimerApiServiceDescriptor()) {
+              throw FailAllTimerRegistrations.fail(processBundleRequest);
+            }
+            bundleProcessor.getTimerEndpoints().add(timerEndpoint);
+          },
           progressRequestCallbacks::add,
           splitListener,
           bundleFinalizer,
           bundleProcessor.getChannelRoots());
     }
+    bundleProcessor.finish();

Review comment:
       I didn't want to take on a mostly unrelated refactoring of a big chunk of the ProcessBundleHandler within this PR as I had already done one refactoring to migrate the PTransformRunnerFactory to use a Context object instead of passing in all the arguments.
   
   The BundleProcessor::getInstructionId is passed in to the PTransformRunnerFactory context so it needs to exist beforehand but the BeamFnDataInboundObserver2 can only be constructed once all the endpoints have been registered which is why a lot of the lists and internal objects have certain mutable aspects which aren't really needed to be mutable after all the PTransform runners are created.
   
   Overall the internal workings of the ProcessBundleHandler code related to creating the associated PTransforms and  BundleProcessor likely need to move to a builder pattern where we have one object responsible for creating the transforms and a different object responsible for executing bundles.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15807:
URL: https://github.com/apache/beam/pull/15807#issuecomment-953075612


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15807:
URL: https://github.com/apache/beam/pull/15807#issuecomment-952375984


   Run Java Samza PortableValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738850651



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -685,11 +715,27 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) {
           finishFunctionRegistry,
           resetFunctions::add,
           tearDownFunctions::add,
+          (apiServiceDescriptor, dataEndpoint) -> {
+            if (!bundleProcessor
+                .getInboundEndpointApiServiceDescriptors()
+                .contains(apiServiceDescriptor)) {
+              bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor);
+            }
+            bundleProcessor.getInboundDataEndpoints().add(dataEndpoint);
+          },
+          (timerEndpoint) -> {
+            if (!bundleDescriptor.hasTimerApiServiceDescriptor()) {
+              throw FailAllTimerRegistrations.fail(processBundleRequest);
+            }
+            bundleProcessor.getTimerEndpoints().add(timerEndpoint);
+          },
           progressRequestCallbacks::add,
           splitListener,
           bundleFinalizer,
           bundleProcessor.getChannelRoots());
     }
+    bundleProcessor.finish();

Review comment:
       This finish logic feels error-prone. Couldn't you create all the members first, and then construct the `BundleProcessor`, fully realized?

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java
##########
@@ -49,7 +49,10 @@
  *
  * <p>TODO: Add support for multiplexing over multiple outbound observers by stickying the output
  * location with a specific outbound observer.
+ *
+ * @deprecated Migrate to {@link BeamFnDataGrpcMultiplexer2}.

Review comment:
       Are there other usages of these classes that prevent us from going ahead and removing them?

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PTransformRunnerFactoryTestContext.java
##########
@@ -210,6 +222,34 @@ public void accept(T input) throws Exception {
     };
   }
 
+  public abstract Map<ApiServiceDescriptor, List<DataEndpoint<?>>> getIncomingDataEndpoints();
+
+  @Override
+  public <T> void addIncomingDataEndpoint(
+      ApiServiceDescriptor apiServiceDescriptor, Coder<T> coder, FnDataReceiver<T> receiver) {
+    getIncomingDataEndpoints()
+        .computeIfAbsent(apiServiceDescriptor, (unused) -> new ArrayList<>())
+        .add(DataEndpoint.create(getPTransformId(), coder, receiver));
+  }
+
+  public abstract List<TimerEndpoint<?>> getIncomingTimerEndpoints();
+
+  public <T> TimerEndpoint<T> getIncomingTimerEndpoint(String timerFamilyId) {
+    for (TimerEndpoint<?> timerEndpoint : getIncomingTimerEndpoints()) {
+      if (timerFamilyId.equals(timerEndpoint.getTimerFamilyId())) {
+        return (TimerEndpoint<T>) timerEndpoint;
+      }
+    }

Review comment:
       I guess this is just a test class, but why not store the timer endpoints in a map rather than iterating over them?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -858,7 +909,11 @@ public static BundleProcessor create(
 
     abstract HandleStateCallsForBundle getBeamFnStateClient();
 
-    abstract QueueingBeamFnDataClient getQueueingClient();
+    abstract List<Endpoints.ApiServiceDescriptor> getInboundEndpointApiServiceDescriptors();
+
+    abstract List<DataEndpoint<?>> getInboundDataEndpoints();
+
+    abstract List<TimerEndpoint<?>> getTimerEndpoints();

Review comment:
       Do these need to be mutable lists (i.e. will they be modified after creation), or are they just mutable because of the setup and `finish()` logic?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15807:
URL: https://github.com/apache/beam/pull/15807#issuecomment-953137436


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15807:
URL: https://github.com/apache/beam/pull/15807#issuecomment-952376070


   Run Java Spark PortableValidatesRunner Batch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738919427



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java
##########
@@ -49,7 +49,10 @@
  *
  * <p>TODO: Add support for multiplexing over multiple outbound observers by stickying the output
  * location with a specific outbound observer.
+ *
+ * @deprecated Migrate to {@link BeamFnDataGrpcMultiplexer2}.

Review comment:
       Yeah, the OSS runners use these to process bundles and the Dataflow JRH has an annoying usage.
   
   Cleaning up the OSS runners usage isn't too hard but cleaning up the Dataflow JRH usage is a waste of time and I would rather wait for it to be deleted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
apilloud commented on pull request #15807:
URL: https://github.com/apache/beam/pull/15807#issuecomment-957013117


   It is possible this broke the `Run Java Spark PortableValidatesRunner Batch` test. After this change the test is reliably timing out (BEAM-13164). I have #15861 to up the timeout, but it looks like tests may be failing as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
apilloud commented on pull request #15807:
URL: https://github.com/apache/beam/pull/15807#issuecomment-957013117


   It is possible this broke the `Run Java Spark PortableValidatesRunner Batch` test. After this change the test is reliably timing out (BEAM-13164). I have #15861 to up the timeout, but it looks like tests may be failing as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15807:
URL: https://github.com/apache/beam/pull/15807#issuecomment-959746130


   > It is possible this broke the `Run Java Spark PortableValidatesRunner Batch` test. After this change the test is reliably timing out ([BEAM-13164](https://issues.apache.org/jira/browse/BEAM-13164)). I have #15861 to up the timeout, but it looks like tests may be failing as well.
   
   Thanks, taking a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15807: [BEAM-13015] Migrate bundle processing in the SDK harness to using BeamFnDataInboundObserver2 and BeamFnDataGrpcMultiplexer2.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15807:
URL: https://github.com/apache/beam/pull/15807#issuecomment-959746130


   > It is possible this broke the `Run Java Spark PortableValidatesRunner Batch` test. After this change the test is reliably timing out ([BEAM-13164](https://issues.apache.org/jira/browse/BEAM-13164)). I have #15861 to up the timeout, but it looks like tests may be failing as well.
   
   Thanks, taking a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org