You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/23 18:20:01 UTC

[GitHub] [beam] y1chi opened a new pull request #16051: [BEAM-13193] Support StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_…

y1chi opened a new pull request #16051:
URL: https://github.com/apache/beam/pull/16051


   …EMBEDDING in Java SDK harness.
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #16051: [BEAM-13193] Support StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_…

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #16051:
URL: https://github.com/apache/beam/pull/16051#discussion_r756533504



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
##########
@@ -179,6 +118,69 @@ public void awaitCompletion() throws Exception {
     }
   }
 
+  /** Dispatches the data and timers from the elements to corresponding receivers. */
+  public void multiplexElements(Elements elements) throws Exception {

Review comment:
       it is done if no exception is thrown from the function, and if it is not done the exception should be propagated to ProcessBundleHandler.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #16051: [BEAM-13193] Support StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_…

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #16051:
URL: https://github.com/apache/beam/pull/16051#discussion_r756534924



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
##########
@@ -179,6 +118,69 @@ public void awaitCompletion() throws Exception {
     }
   }
 
+  /** Dispatches the data and timers from the elements to corresponding receivers. */
+  public void multiplexElements(Elements elements) throws Exception {

Review comment:
       I think this depends on whether we still want to send the terminal elements from runner if we know from the runner that the bundle is completed with the embedded elements, in which case the endpoint status tracking seems not useful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16051: [BEAM-13193] Support StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_…

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16051:
URL: https://github.com/apache/beam/pull/16051#discussion_r756473242



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
##########
@@ -179,6 +118,69 @@ public void awaitCompletion() throws Exception {
     }
   }
 
+  /** Dispatches the data and timers from the elements to corresponding receivers. */
+  public void multiplexElements(Elements elements) throws Exception {

Review comment:
       can this return true/false if we are done and in ProcessBundleHandler throw an error if we aren't done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16051: [BEAM-13193] Support StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_…

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16051:
URL: https://github.com/apache/beam/pull/16051#discussion_r755402988



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -464,6 +465,9 @@ public BundleFinalizer getBundleFinalizer() {
                 request.getInstructionId(),
                 bundleProcessor.getInboundEndpointApiServiceDescriptors(),
                 observer);
+            if (request.getProcessBundle().hasElements()) {
+              observer.accept(request.getProcessBundle().getElements());

Review comment:
       Wasn't the contract that all the data had to come from the ProcessBundleRequest and if not that would be an error or did we swap back to treating it like a prefix (you have implemented the latter)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #16051: [BEAM-13193] Support StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_…

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #16051:
URL: https://github.com/apache/beam/pull/16051


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16051: [BEAM-13193] Support StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_…

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16051:
URL: https://github.com/apache/beam/pull/16051#discussion_r755438861



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
##########
@@ -179,6 +118,80 @@ public void awaitCompletion() throws Exception {
     }
   }
 
+  /**
+   * Dispatches the data and timers from the elements which is known to be complete. Should be used
+   * only if we know that all the data and timers for the bundle are contained in this single
+   * elements object. When invoked, data and timers are multiplexed to corresponding receiver
+   * without endpoint done state tracking (since it is guaranteed that all endpoints are done after
+   * the function call returns.
+   */
+  public void dispatchKnownCompleteElements(Elements elements) throws Exception {
+    multiplexElements(elements, false);
+  }
+
+  private void multiplexElements(Elements elements, boolean expectTerminalElements)

Review comment:
       What is the purpose of the boolean?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #16051: [BEAM-13193] Support StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_…

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #16051:
URL: https://github.com/apache/beam/pull/16051#issuecomment-982983812


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16051: [BEAM-13193] Support StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_…

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16051:
URL: https://github.com/apache/beam/pull/16051#discussion_r758826835



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
##########
@@ -104,71 +105,9 @@ public void awaitCompletion() throws Exception {
     try {
       while (true) {
         BeamFnApi.Elements elements = queue.take();
-        for (BeamFnApi.Elements.Data data : elements.getDataList()) {
-          EndpointStatus<DataEndpoint<?>> endpoint =
-              transformIdToDataEndpoint.get(data.getTransformId());
-          if (endpoint == null) {
-            throw new IllegalStateException(
-                String.format(
-                    "Unable to find inbound data receiver for instruction %s and transform %s.",
-                    data.getInstructionId(), data.getTransformId()));
-          } else if (endpoint.isDone) {
-            throw new IllegalStateException(
-                String.format(
-                    "Received data after inbound data receiver is done for instruction %s and transform %s.",
-                    data.getInstructionId(), data.getTransformId()));
-          }
-          InputStream inputStream = data.getData().newInput();
-          Coder<Object> coder = (Coder<Object>) endpoint.endpoint.getCoder();
-          FnDataReceiver<Object> receiver =
-              (FnDataReceiver<Object>) endpoint.endpoint.getReceiver();
-          while (inputStream.available() > 0) {
-            receiver.accept(coder.decode(inputStream));
-          }
-          if (data.getIsLast()) {
-            endpoint.isDone = true;
-            numEndpointsThatAreIncomplete -= 1;
-            if (numEndpointsThatAreIncomplete == 0) {
-              return;
-            }
-          }
-        }
-
-        for (BeamFnApi.Elements.Timers timers : elements.getTimersList()) {
-          Map<String, EndpointStatus<TimerEndpoint<?>>> timerFamilyIdToEndpoints =
-              transformIdToTimerFamilyIdToTimerEndpoint.get(timers.getTransformId());
-          if (timerFamilyIdToEndpoints == null) {
-            throw new IllegalStateException(
-                String.format(
-                    "Unable to find inbound timer receiver for instruction %s, transform %s, and timer family %s.",
-                    timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
-          }
-          EndpointStatus<TimerEndpoint<?>> endpoint =
-              timerFamilyIdToEndpoints.get(timers.getTimerFamilyId());
-          if (endpoint == null) {
-            throw new IllegalStateException(
-                String.format(
-                    "Unable to find inbound timer receiver for instruction %s, transform %s, and timer family %s.",
-                    timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
-          } else if (endpoint.isDone) {
-            throw new IllegalStateException(
-                String.format(
-                    "Received timer after inbound timer receiver is done for instruction %s, transform %s, and timer family %s.",
-                    timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
-          }
-          InputStream inputStream = timers.getTimers().newInput();
-          Coder<Object> coder = (Coder<Object>) endpoint.endpoint.getCoder();
-          FnDataReceiver<Object> receiver =
-              (FnDataReceiver<Object>) endpoint.endpoint.getReceiver();
-          while (inputStream.available() > 0) {
-            receiver.accept(coder.decode(inputStream));
-          }
-          if (timers.getIsLast()) {
-            numEndpointsThatAreIncomplete -= 1;
-            if (numEndpointsThatAreIncomplete == 0) {
-              return;
-            }
-          }
+        multiplexElements(elements);
+        if (numEndpointsThatAreIncomplete == 0) {
+          return;
         }

Review comment:
       ```suggestion
           if (multiplexElements(elements)) {
             return;
           }
   ```

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -790,6 +811,367 @@ public void testPTransformStartExceptionsArePropagated() {
     assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), empty());
   }
 
+  private static final class SimpleRecordingDoFn extends DoFn<KV<String, String>, String> {
+    private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<>("mainOutput");
+    private static final String TIMER_FAMILY_ID = "timer_family";
+
+    @TimerFamily(TIMER_FAMILY_ID)
+    private final TimerSpec timer = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    static List<String> consumedData = new ArrayList<>();
+    static List<String> firedOnTimerCallbackTimerIds = new ArrayList<>();
+
+    @ProcessElement
+    public void processElement(ProcessContext context, BoundedWindow window) {}
+
+    @OnTimerFamily(TIMER_FAMILY_ID)
+    public void onTimer(@TimerId String timerId) {
+      firedOnTimerCallbackTimerIds.add(timerId);
+    }
+  }
+
+  private ProcessBundleHandler setupProcessBundleHanlderForSimpleRecordingDoFn() throws Exception {
+    SimpleRecordingDoFn.consumedData.clear();
+    SimpleRecordingDoFn.firedOnTimerCallbackTimerIds.clear();
+    DoFnWithExecutionInformation doFnWithExecutionInformation =
+        DoFnWithExecutionInformation.of(
+            new SimpleRecordingDoFn(),
+            SimpleRecordingDoFn.MAIN_OUTPUT_TAG,
+            Collections.emptyMap(),
+            DoFnSchemaInformation.create());
+    RunnerApi.FunctionSpec functionSpec =
+        RunnerApi.FunctionSpec.newBuilder()
+            .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
+            .setPayload(
+                ByteString.copyFrom(
+                    SerializableUtils.serializeToByteArray(doFnWithExecutionInformation)))
+            .build();
+    RunnerApi.ParDoPayload parDoPayload =
+        ParDoPayload.newBuilder()
+            .setDoFn(functionSpec)
+            .putTimerFamilySpecs(
+                "tfs-" + SimpleRecordingDoFn.TIMER_FAMILY_ID,
+                TimerFamilySpec.newBuilder()
+                    .setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME)
+                    .setTimerFamilyCoderId("timer-coder")
+                    .build())
+            .build();
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        ProcessBundleDescriptor.newBuilder()
+            .putTransforms(
+                "2L",
+                PTransform.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                    .putOutputs("2L-output", "2L-output-pc")
+                    .build())
+            .putTransforms(
+                "3L",
+                PTransform.newBuilder()
+                    .setSpec(
+                        FunctionSpec.newBuilder()
+                            .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                            .setPayload(parDoPayload.toByteString()))
+                    .putInputs("3L-input", "2L-output-pc")
+                    .build())
+            .putPcollections(
+                "2L-output-pc",
+                PCollection.newBuilder()
+                    .setWindowingStrategyId("window-strategy")
+                    .setCoderId("2L-output-coder")
+                    .build())
+            .putWindowingStrategies(
+                "window-strategy",
+                WindowingStrategy.newBuilder()
+                    .setWindowCoderId("window-strategy-coder")
+                    .setWindowFn(
+                        FunctionSpec.newBuilder().setUrn("beam:window_fn:global_windows:v1"))
+                    .setOutputTime(OutputTime.Enum.END_OF_WINDOW)
+                    .setAccumulationMode(AccumulationMode.Enum.ACCUMULATING)
+                    .setTrigger(Trigger.newBuilder().setAlways(Always.getDefaultInstance()))
+                    .setClosingBehavior(ClosingBehavior.Enum.EMIT_ALWAYS)
+                    .setOnTimeBehavior(OnTimeBehavior.Enum.FIRE_ALWAYS)
+                    .build())
+            .setTimerApiServiceDescriptor(ApiServiceDescriptor.newBuilder().setUrl("url").build())
+            .putCoders("string_coder", CoderTranslation.toProto(StringUtf8Coder.of()).getCoder())
+            .putCoders(
+                "2L-output-coder",
+                Coder.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build())
+                    .addComponentCoderIds("string_coder")
+                    .addComponentCoderIds("string_coder")
+                    .build())
+            .putCoders(
+                "window-strategy-coder",
+                Coder.newBuilder()
+                    .setSpec(
+                        FunctionSpec.newBuilder()
+                            .setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+                            .build())
+                    .build())
+            .putCoders(
+                "timer-coder",
+                Coder.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.TIMER_CODER_URN))
+                    .addComponentCoderIds("string_coder")
+                    .addComponentCoderIds("window-strategy-coder")
+                    .build())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+
+    Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap =
+        Maps.newHashMap(REGISTERED_RUNNER_FACTORIES);
+    urnToPTransformRunnerFactoryMap.put(
+        DATA_INPUT_URN,
+        (PTransformRunnerFactory<Object>)
+            (context) -> {
+              context.addIncomingDataEndpoint(
+                  ApiServiceDescriptor.getDefaultInstance(),
+                  KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()),
+                  (input) -> {
+                    SimpleRecordingDoFn.consumedData.add(input.getValue());
+                  });
+              return null;
+            });
+
+    Mockito.doAnswer(
+            (invocation) -> {
+              // A no op consumer for timers.
+              return new CloseableFnDataReceiver() {
+                @Override
+                public void accept(Object input) throws Exception {}
+
+                @Override
+                public void flush() throws Exception {}
+
+                @Override
+                public void close() throws Exception {}
+              };
+            })
+        .when(beamFnDataClient)
+        .send(any(), any(), any());
+
+    return new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        Collections.emptySet(),
+        fnApiRegistry::get,
+        beamFnDataClient,
+        null /* beamFnStateClient */,
+        null /* finalizeBundleHandler */,
+        new ShortIdMap(),
+        urnToPTransformRunnerFactoryMap,
+        new BundleProcessorCache());
+  }
+
+  @Test
+  public void testInstructionEmbeddedElementsAreProcessed() throws Exception {
+    ProcessBundleHandler handler = setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+    ByteString.Output encodedData = ByteString.newOutput();
+    KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
+    ByteString.Output encodedTimer = ByteString.newOutput();
+    Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+        .encode(
+            Timer.of(
+                "",
+                "timer_id",
+                Collections.singletonList(GlobalWindow.INSTANCE),
+                Instant.ofEpochMilli(1L),
+                Instant.ofEpochMilli(1L),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING),
+            encodedTimer);
+
+    handler.processBundle(
+        InstructionRequest.newBuilder()
+            .setInstructionId("998L")
+            .setProcessBundle(
+                ProcessBundleRequest.newBuilder()
+                    .setProcessBundleDescriptorId("1L")
+                    .setElements(
+                        Elements.newBuilder()
+                            .addData(
+                                Data.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("2L")
+                                    .setData(encodedData.toByteString())
+                                    .build())
+                            .addData(
+                                Data.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("2L")
+                                    .setIsLast(true)
+                                    .build())
+                            .addTimers(
+                                Timers.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("3L")
+                                    .setTimerFamilyId(
+                                        TimerFamilyDeclaration.PREFIX
+                                            + SimpleRecordingDoFn.TIMER_FAMILY_ID)
+                                    .setTimers(encodedTimer.toByteString())
+                                    .build())
+                            .addTimers(
+                                Timers.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("3L")
+                                    .setTimerFamilyId(
+                                        TimerFamilyDeclaration.PREFIX
+                                            + SimpleRecordingDoFn.TIMER_FAMILY_ID)
+                                    .setIsLast(true)
+                                    .build())
+                            .build()))
+            .build());
+    handler.shutdown();
+    assertThat(SimpleRecordingDoFn.consumedData, contains("data"));
+    assertThat(SimpleRecordingDoFn.firedOnTimerCallbackTimerIds, contains("timer_id"));
+    // Register timer family outbound receiver.
+    verify(beamFnDataClient).send(any(), any(), any());
+    verifyNoMoreInteractions(beamFnDataClient);
+  }
+
+  @Test
+  public void testInstructionEmbeddedElementsWithMalformedData() throws Exception {
+    ProcessBundleHandler handler = setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+    ByteString.Output encodedData = ByteString.newOutput();
+    KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
+
+    assertThrows(
+        "Expect java.lang.IllegalStateException: Unable to find inbound data receiver for"
+            + " instruction 998L and transform 3L. But was not thrown or Exception did not match.",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                InstructionRequest.newBuilder()
+                    .setInstructionId("998L")
+                    .setProcessBundle(
+                        ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L")
+                            .setElements(
+                                Elements.newBuilder()
+                                    .addData(
+                                        Data.newBuilder()
+                                            .setInstructionId("998L")
+                                            .setTransformId("3L")
+                                            .setData(encodedData.toByteString())
+                                            .build())
+                                    .build()))
+                    .build()));
+    assertThrows(
+        "Expect java.lang.RuntimeException: Elements embedded in ProcessBundleRequest are "
+            + "incomplete. But was not thrown or Exception did not match.",
+        RuntimeException.class,
+        () ->
+            handler.processBundle(
+                InstructionRequest.newBuilder()
+                    .setInstructionId("998L")
+                    .setProcessBundle(
+                        ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L")
+                            .setElements(
+                                Elements.newBuilder()
+                                    .addData(
+                                        Data.newBuilder()
+                                            .setInstructionId("998L")
+                                            .setTransformId("2L")
+                                            .setData(encodedData.toByteString())
+                                            .build())
+                                    .build()))
+                    .build()));
+    handler.shutdown();
+  }
+
+  @Test
+  public void testInstructionEmbeddedElementsWithMalformedTimers() throws Exception {
+    ProcessBundleHandler handler = setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+    ByteString.Output encodedTimer = ByteString.newOutput();
+    Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+        .encode(
+            Timer.of(
+                "",
+                "timer_id",
+                Collections.singletonList(GlobalWindow.INSTANCE),
+                Instant.ofEpochMilli(1L),
+                Instant.ofEpochMilli(1L),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING),
+            encodedTimer);
+
+    assertThrows(
+        "Expect java.lang.IllegalStateException: Unable to find inbound timer receiver "
+            + "for instruction 998L, transform 4L, and timer family tfs-timer_family. But was not"
+            + " thrown or Exception did not match.",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                InstructionRequest.newBuilder()
+                    .setInstructionId("998L")
+                    .setProcessBundle(
+                        ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L")
+                            .setElements(
+                                Elements.newBuilder()
+                                    .addTimers(
+                                        Timers.newBuilder()
+                                            .setInstructionId("998L")
+                                            .setTransformId("4L")
+                                            .setTimerFamilyId(
+                                                TimerFamilyDeclaration.PREFIX
+                                                    + SimpleRecordingDoFn.TIMER_FAMILY_ID)
+                                            .setTimers(encodedTimer.toByteString())
+                                            .build())
+                                    .build()))
+                    .build()));
+    assertThrows(
+        "Expect java.lang.IllegalStateException: Unable to find inbound timer receiver "
+            + "for instruction 998L, transform 3L, and timer family tfs-not_declared_id. But was "
+            + "not thrown or Exception did not match.",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                InstructionRequest.newBuilder()
+                    .setInstructionId("998L")
+                    .setProcessBundle(
+                        ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L")
+                            .setElements(
+                                Elements.newBuilder()
+                                    .addTimers(
+                                        Timers.newBuilder()
+                                            .setInstructionId("998L")
+                                            .setTransformId("3L")
+                                            .setTimerFamilyId(
+                                                TimerFamilyDeclaration.PREFIX + "not_declared_id")
+                                            .setTimers(encodedTimer.toByteString())
+                                            .build())
+                                    .build()))
+                    .build()));
+    assertThrows(
+        "Expect java.lang.RuntimeException: Elements embedded in ProcessBundleRequest are"

Review comment:
       "But was not thrown or Exception did not match" ???

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -790,6 +811,367 @@ public void testPTransformStartExceptionsArePropagated() {
     assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), empty());
   }
 
+  private static final class SimpleRecordingDoFn extends DoFn<KV<String, String>, String> {
+    private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<>("mainOutput");
+    private static final String TIMER_FAMILY_ID = "timer_family";
+
+    @TimerFamily(TIMER_FAMILY_ID)
+    private final TimerSpec timer = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    static List<String> consumedData = new ArrayList<>();
+    static List<String> firedOnTimerCallbackTimerIds = new ArrayList<>();
+
+    @ProcessElement
+    public void processElement(ProcessContext context, BoundedWindow window) {}
+
+    @OnTimerFamily(TIMER_FAMILY_ID)
+    public void onTimer(@TimerId String timerId) {
+      firedOnTimerCallbackTimerIds.add(timerId);
+    }
+  }
+
+  private ProcessBundleHandler setupProcessBundleHanlderForSimpleRecordingDoFn() throws Exception {
+    SimpleRecordingDoFn.consumedData.clear();
+    SimpleRecordingDoFn.firedOnTimerCallbackTimerIds.clear();
+    DoFnWithExecutionInformation doFnWithExecutionInformation =
+        DoFnWithExecutionInformation.of(
+            new SimpleRecordingDoFn(),
+            SimpleRecordingDoFn.MAIN_OUTPUT_TAG,
+            Collections.emptyMap(),
+            DoFnSchemaInformation.create());
+    RunnerApi.FunctionSpec functionSpec =
+        RunnerApi.FunctionSpec.newBuilder()
+            .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
+            .setPayload(
+                ByteString.copyFrom(
+                    SerializableUtils.serializeToByteArray(doFnWithExecutionInformation)))
+            .build();
+    RunnerApi.ParDoPayload parDoPayload =
+        ParDoPayload.newBuilder()
+            .setDoFn(functionSpec)
+            .putTimerFamilySpecs(
+                "tfs-" + SimpleRecordingDoFn.TIMER_FAMILY_ID,
+                TimerFamilySpec.newBuilder()
+                    .setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME)
+                    .setTimerFamilyCoderId("timer-coder")
+                    .build())
+            .build();
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        ProcessBundleDescriptor.newBuilder()
+            .putTransforms(
+                "2L",
+                PTransform.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                    .putOutputs("2L-output", "2L-output-pc")
+                    .build())
+            .putTransforms(
+                "3L",
+                PTransform.newBuilder()
+                    .setSpec(
+                        FunctionSpec.newBuilder()
+                            .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                            .setPayload(parDoPayload.toByteString()))
+                    .putInputs("3L-input", "2L-output-pc")
+                    .build())
+            .putPcollections(
+                "2L-output-pc",
+                PCollection.newBuilder()
+                    .setWindowingStrategyId("window-strategy")
+                    .setCoderId("2L-output-coder")
+                    .build())
+            .putWindowingStrategies(
+                "window-strategy",
+                WindowingStrategy.newBuilder()
+                    .setWindowCoderId("window-strategy-coder")
+                    .setWindowFn(
+                        FunctionSpec.newBuilder().setUrn("beam:window_fn:global_windows:v1"))
+                    .setOutputTime(OutputTime.Enum.END_OF_WINDOW)
+                    .setAccumulationMode(AccumulationMode.Enum.ACCUMULATING)
+                    .setTrigger(Trigger.newBuilder().setAlways(Always.getDefaultInstance()))
+                    .setClosingBehavior(ClosingBehavior.Enum.EMIT_ALWAYS)
+                    .setOnTimeBehavior(OnTimeBehavior.Enum.FIRE_ALWAYS)
+                    .build())
+            .setTimerApiServiceDescriptor(ApiServiceDescriptor.newBuilder().setUrl("url").build())
+            .putCoders("string_coder", CoderTranslation.toProto(StringUtf8Coder.of()).getCoder())
+            .putCoders(
+                "2L-output-coder",
+                Coder.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build())
+                    .addComponentCoderIds("string_coder")
+                    .addComponentCoderIds("string_coder")
+                    .build())
+            .putCoders(
+                "window-strategy-coder",
+                Coder.newBuilder()
+                    .setSpec(
+                        FunctionSpec.newBuilder()
+                            .setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+                            .build())
+                    .build())
+            .putCoders(
+                "timer-coder",
+                Coder.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.TIMER_CODER_URN))
+                    .addComponentCoderIds("string_coder")
+                    .addComponentCoderIds("window-strategy-coder")
+                    .build())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+
+    Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap =
+        Maps.newHashMap(REGISTERED_RUNNER_FACTORIES);
+    urnToPTransformRunnerFactoryMap.put(
+        DATA_INPUT_URN,
+        (PTransformRunnerFactory<Object>)
+            (context) -> {
+              context.addIncomingDataEndpoint(
+                  ApiServiceDescriptor.getDefaultInstance(),
+                  KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()),
+                  (input) -> {
+                    SimpleRecordingDoFn.consumedData.add(input.getValue());
+                  });
+              return null;
+            });
+
+    Mockito.doAnswer(
+            (invocation) -> {
+              // A no op consumer for timers.
+              return new CloseableFnDataReceiver() {
+                @Override
+                public void accept(Object input) throws Exception {}
+
+                @Override
+                public void flush() throws Exception {}
+
+                @Override
+                public void close() throws Exception {}
+              };
+            })
+        .when(beamFnDataClient)
+        .send(any(), any(), any());
+
+    return new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        Collections.emptySet(),
+        fnApiRegistry::get,
+        beamFnDataClient,
+        null /* beamFnStateClient */,
+        null /* finalizeBundleHandler */,
+        new ShortIdMap(),
+        urnToPTransformRunnerFactoryMap,
+        new BundleProcessorCache());
+  }
+
+  @Test
+  public void testInstructionEmbeddedElementsAreProcessed() throws Exception {
+    ProcessBundleHandler handler = setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+    ByteString.Output encodedData = ByteString.newOutput();
+    KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
+    ByteString.Output encodedTimer = ByteString.newOutput();
+    Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+        .encode(
+            Timer.of(
+                "",
+                "timer_id",
+                Collections.singletonList(GlobalWindow.INSTANCE),
+                Instant.ofEpochMilli(1L),
+                Instant.ofEpochMilli(1L),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING),
+            encodedTimer);
+
+    handler.processBundle(
+        InstructionRequest.newBuilder()
+            .setInstructionId("998L")
+            .setProcessBundle(
+                ProcessBundleRequest.newBuilder()
+                    .setProcessBundleDescriptorId("1L")
+                    .setElements(
+                        Elements.newBuilder()
+                            .addData(
+                                Data.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("2L")
+                                    .setData(encodedData.toByteString())
+                                    .build())
+                            .addData(
+                                Data.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("2L")
+                                    .setIsLast(true)
+                                    .build())
+                            .addTimers(
+                                Timers.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("3L")
+                                    .setTimerFamilyId(
+                                        TimerFamilyDeclaration.PREFIX
+                                            + SimpleRecordingDoFn.TIMER_FAMILY_ID)
+                                    .setTimers(encodedTimer.toByteString())
+                                    .build())
+                            .addTimers(
+                                Timers.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("3L")
+                                    .setTimerFamilyId(
+                                        TimerFamilyDeclaration.PREFIX
+                                            + SimpleRecordingDoFn.TIMER_FAMILY_ID)
+                                    .setIsLast(true)
+                                    .build())
+                            .build()))
+            .build());
+    handler.shutdown();
+    assertThat(SimpleRecordingDoFn.consumedData, contains("data"));
+    assertThat(SimpleRecordingDoFn.firedOnTimerCallbackTimerIds, contains("timer_id"));
+    // Register timer family outbound receiver.
+    verify(beamFnDataClient).send(any(), any(), any());
+    verifyNoMoreInteractions(beamFnDataClient);
+  }
+
+  @Test
+  public void testInstructionEmbeddedElementsWithMalformedData() throws Exception {
+    ProcessBundleHandler handler = setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+    ByteString.Output encodedData = ByteString.newOutput();
+    KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
+
+    assertThrows(
+        "Expect java.lang.IllegalStateException: Unable to find inbound data receiver for"
+            + " instruction 998L and transform 3L. But was not thrown or Exception did not match.",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                InstructionRequest.newBuilder()
+                    .setInstructionId("998L")
+                    .setProcessBundle(
+                        ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L")
+                            .setElements(
+                                Elements.newBuilder()
+                                    .addData(
+                                        Data.newBuilder()
+                                            .setInstructionId("998L")
+                                            .setTransformId("3L")
+                                            .setData(encodedData.toByteString())
+                                            .build())
+                                    .build()))
+                    .build()));
+    assertThrows(
+        "Expect java.lang.RuntimeException: Elements embedded in ProcessBundleRequest are "
+            + "incomplete. But was not thrown or Exception did not match.",

Review comment:
       "But was not thrown or Exception did not match" ???

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -790,6 +811,367 @@ public void testPTransformStartExceptionsArePropagated() {
     assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), empty());
   }
 
+  private static final class SimpleRecordingDoFn extends DoFn<KV<String, String>, String> {
+    private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<>("mainOutput");
+    private static final String TIMER_FAMILY_ID = "timer_family";
+
+    @TimerFamily(TIMER_FAMILY_ID)
+    private final TimerSpec timer = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    static List<String> consumedData = new ArrayList<>();
+    static List<String> firedOnTimerCallbackTimerIds = new ArrayList<>();
+
+    @ProcessElement
+    public void processElement(ProcessContext context, BoundedWindow window) {}
+
+    @OnTimerFamily(TIMER_FAMILY_ID)
+    public void onTimer(@TimerId String timerId) {
+      firedOnTimerCallbackTimerIds.add(timerId);
+    }
+  }
+
+  private ProcessBundleHandler setupProcessBundleHanlderForSimpleRecordingDoFn() throws Exception {
+    SimpleRecordingDoFn.consumedData.clear();
+    SimpleRecordingDoFn.firedOnTimerCallbackTimerIds.clear();
+    DoFnWithExecutionInformation doFnWithExecutionInformation =
+        DoFnWithExecutionInformation.of(
+            new SimpleRecordingDoFn(),
+            SimpleRecordingDoFn.MAIN_OUTPUT_TAG,
+            Collections.emptyMap(),
+            DoFnSchemaInformation.create());
+    RunnerApi.FunctionSpec functionSpec =
+        RunnerApi.FunctionSpec.newBuilder()
+            .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
+            .setPayload(
+                ByteString.copyFrom(
+                    SerializableUtils.serializeToByteArray(doFnWithExecutionInformation)))
+            .build();
+    RunnerApi.ParDoPayload parDoPayload =
+        ParDoPayload.newBuilder()
+            .setDoFn(functionSpec)
+            .putTimerFamilySpecs(
+                "tfs-" + SimpleRecordingDoFn.TIMER_FAMILY_ID,
+                TimerFamilySpec.newBuilder()
+                    .setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME)
+                    .setTimerFamilyCoderId("timer-coder")
+                    .build())
+            .build();
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        ProcessBundleDescriptor.newBuilder()
+            .putTransforms(
+                "2L",
+                PTransform.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                    .putOutputs("2L-output", "2L-output-pc")
+                    .build())
+            .putTransforms(
+                "3L",
+                PTransform.newBuilder()
+                    .setSpec(
+                        FunctionSpec.newBuilder()
+                            .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                            .setPayload(parDoPayload.toByteString()))
+                    .putInputs("3L-input", "2L-output-pc")
+                    .build())
+            .putPcollections(
+                "2L-output-pc",
+                PCollection.newBuilder()
+                    .setWindowingStrategyId("window-strategy")
+                    .setCoderId("2L-output-coder")
+                    .build())
+            .putWindowingStrategies(
+                "window-strategy",
+                WindowingStrategy.newBuilder()
+                    .setWindowCoderId("window-strategy-coder")
+                    .setWindowFn(
+                        FunctionSpec.newBuilder().setUrn("beam:window_fn:global_windows:v1"))
+                    .setOutputTime(OutputTime.Enum.END_OF_WINDOW)
+                    .setAccumulationMode(AccumulationMode.Enum.ACCUMULATING)
+                    .setTrigger(Trigger.newBuilder().setAlways(Always.getDefaultInstance()))
+                    .setClosingBehavior(ClosingBehavior.Enum.EMIT_ALWAYS)
+                    .setOnTimeBehavior(OnTimeBehavior.Enum.FIRE_ALWAYS)
+                    .build())
+            .setTimerApiServiceDescriptor(ApiServiceDescriptor.newBuilder().setUrl("url").build())
+            .putCoders("string_coder", CoderTranslation.toProto(StringUtf8Coder.of()).getCoder())
+            .putCoders(
+                "2L-output-coder",
+                Coder.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build())
+                    .addComponentCoderIds("string_coder")
+                    .addComponentCoderIds("string_coder")
+                    .build())
+            .putCoders(
+                "window-strategy-coder",
+                Coder.newBuilder()
+                    .setSpec(
+                        FunctionSpec.newBuilder()
+                            .setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+                            .build())
+                    .build())
+            .putCoders(
+                "timer-coder",
+                Coder.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.TIMER_CODER_URN))
+                    .addComponentCoderIds("string_coder")
+                    .addComponentCoderIds("window-strategy-coder")
+                    .build())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+
+    Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap =
+        Maps.newHashMap(REGISTERED_RUNNER_FACTORIES);
+    urnToPTransformRunnerFactoryMap.put(
+        DATA_INPUT_URN,
+        (PTransformRunnerFactory<Object>)
+            (context) -> {
+              context.addIncomingDataEndpoint(
+                  ApiServiceDescriptor.getDefaultInstance(),
+                  KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()),
+                  (input) -> {
+                    SimpleRecordingDoFn.consumedData.add(input.getValue());
+                  });
+              return null;
+            });
+
+    Mockito.doAnswer(
+            (invocation) -> {
+              // A no op consumer for timers.
+              return new CloseableFnDataReceiver() {
+                @Override
+                public void accept(Object input) throws Exception {}
+
+                @Override
+                public void flush() throws Exception {}
+
+                @Override
+                public void close() throws Exception {}
+              };
+            })
+        .when(beamFnDataClient)
+        .send(any(), any(), any());
+
+    return new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        Collections.emptySet(),
+        fnApiRegistry::get,
+        beamFnDataClient,
+        null /* beamFnStateClient */,
+        null /* finalizeBundleHandler */,
+        new ShortIdMap(),
+        urnToPTransformRunnerFactoryMap,
+        new BundleProcessorCache());
+  }
+
+  @Test
+  public void testInstructionEmbeddedElementsAreProcessed() throws Exception {
+    ProcessBundleHandler handler = setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+    ByteString.Output encodedData = ByteString.newOutput();
+    KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
+    ByteString.Output encodedTimer = ByteString.newOutput();
+    Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+        .encode(
+            Timer.of(
+                "",
+                "timer_id",
+                Collections.singletonList(GlobalWindow.INSTANCE),
+                Instant.ofEpochMilli(1L),
+                Instant.ofEpochMilli(1L),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING),
+            encodedTimer);
+
+    handler.processBundle(
+        InstructionRequest.newBuilder()
+            .setInstructionId("998L")
+            .setProcessBundle(
+                ProcessBundleRequest.newBuilder()
+                    .setProcessBundleDescriptorId("1L")
+                    .setElements(
+                        Elements.newBuilder()
+                            .addData(
+                                Data.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("2L")
+                                    .setData(encodedData.toByteString())
+                                    .build())
+                            .addData(
+                                Data.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("2L")
+                                    .setIsLast(true)
+                                    .build())
+                            .addTimers(
+                                Timers.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("3L")
+                                    .setTimerFamilyId(
+                                        TimerFamilyDeclaration.PREFIX
+                                            + SimpleRecordingDoFn.TIMER_FAMILY_ID)
+                                    .setTimers(encodedTimer.toByteString())
+                                    .build())
+                            .addTimers(
+                                Timers.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("3L")
+                                    .setTimerFamilyId(
+                                        TimerFamilyDeclaration.PREFIX
+                                            + SimpleRecordingDoFn.TIMER_FAMILY_ID)
+                                    .setIsLast(true)
+                                    .build())
+                            .build()))
+            .build());
+    handler.shutdown();
+    assertThat(SimpleRecordingDoFn.consumedData, contains("data"));
+    assertThat(SimpleRecordingDoFn.firedOnTimerCallbackTimerIds, contains("timer_id"));
+    // Register timer family outbound receiver.
+    verify(beamFnDataClient).send(any(), any(), any());
+    verifyNoMoreInteractions(beamFnDataClient);
+  }
+
+  @Test
+  public void testInstructionEmbeddedElementsWithMalformedData() throws Exception {
+    ProcessBundleHandler handler = setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+    ByteString.Output encodedData = ByteString.newOutput();
+    KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
+
+    assertThrows(
+        "Expect java.lang.IllegalStateException: Unable to find inbound data receiver for"
+            + " instruction 998L and transform 3L. But was not thrown or Exception did not match.",

Review comment:
       "But was not thrown or Exception did not match" ???
   
   The failure cause makes it look like the wrong outcome was being met and not the one that was added specifically for this case.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -458,7 +458,16 @@ public BundleFinalizer getBundleFinalizer() {
             startFunction.run();
           }
 
-          if (!bundleProcessor.getInboundEndpointApiServiceDescriptors().isEmpty()) {
+          if (request.getProcessBundle().hasElements()) {
+            boolean inputFinished =
+                bundleProcessor
+                    .getInboundObserver()
+                    .multiplexElements(request.getProcessBundle().getElements());
+            if (!inputFinished) {
+              throw new RuntimeException(
+                  "Elements embedded in ProcessBundleRequest are incomplete.");

Review comment:
       nit: It would be nice to enumerate the missing ones in addition to the messaging improvement.
   
   ```suggestion
                     "Elements embedded in ProcessBundleRequest do not contain stream terminators for all data and timer inputs.");
   ```
   
   

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -790,6 +811,367 @@ public void testPTransformStartExceptionsArePropagated() {
     assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), empty());
   }
 
+  private static final class SimpleRecordingDoFn extends DoFn<KV<String, String>, String> {
+    private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<>("mainOutput");
+    private static final String TIMER_FAMILY_ID = "timer_family";
+
+    @TimerFamily(TIMER_FAMILY_ID)
+    private final TimerSpec timer = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    static List<String> consumedData = new ArrayList<>();
+    static List<String> firedOnTimerCallbackTimerIds = new ArrayList<>();
+
+    @ProcessElement
+    public void processElement(ProcessContext context, BoundedWindow window) {}
+
+    @OnTimerFamily(TIMER_FAMILY_ID)
+    public void onTimer(@TimerId String timerId) {
+      firedOnTimerCallbackTimerIds.add(timerId);
+    }
+  }
+
+  private ProcessBundleHandler setupProcessBundleHanlderForSimpleRecordingDoFn() throws Exception {
+    SimpleRecordingDoFn.consumedData.clear();
+    SimpleRecordingDoFn.firedOnTimerCallbackTimerIds.clear();
+    DoFnWithExecutionInformation doFnWithExecutionInformation =
+        DoFnWithExecutionInformation.of(
+            new SimpleRecordingDoFn(),
+            SimpleRecordingDoFn.MAIN_OUTPUT_TAG,
+            Collections.emptyMap(),
+            DoFnSchemaInformation.create());
+    RunnerApi.FunctionSpec functionSpec =
+        RunnerApi.FunctionSpec.newBuilder()
+            .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
+            .setPayload(
+                ByteString.copyFrom(
+                    SerializableUtils.serializeToByteArray(doFnWithExecutionInformation)))
+            .build();
+    RunnerApi.ParDoPayload parDoPayload =
+        ParDoPayload.newBuilder()
+            .setDoFn(functionSpec)
+            .putTimerFamilySpecs(
+                "tfs-" + SimpleRecordingDoFn.TIMER_FAMILY_ID,
+                TimerFamilySpec.newBuilder()
+                    .setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME)
+                    .setTimerFamilyCoderId("timer-coder")
+                    .build())
+            .build();
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        ProcessBundleDescriptor.newBuilder()
+            .putTransforms(
+                "2L",
+                PTransform.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                    .putOutputs("2L-output", "2L-output-pc")
+                    .build())
+            .putTransforms(
+                "3L",
+                PTransform.newBuilder()
+                    .setSpec(
+                        FunctionSpec.newBuilder()
+                            .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                            .setPayload(parDoPayload.toByteString()))
+                    .putInputs("3L-input", "2L-output-pc")
+                    .build())
+            .putPcollections(
+                "2L-output-pc",
+                PCollection.newBuilder()
+                    .setWindowingStrategyId("window-strategy")
+                    .setCoderId("2L-output-coder")
+                    .build())
+            .putWindowingStrategies(
+                "window-strategy",
+                WindowingStrategy.newBuilder()
+                    .setWindowCoderId("window-strategy-coder")
+                    .setWindowFn(
+                        FunctionSpec.newBuilder().setUrn("beam:window_fn:global_windows:v1"))
+                    .setOutputTime(OutputTime.Enum.END_OF_WINDOW)
+                    .setAccumulationMode(AccumulationMode.Enum.ACCUMULATING)
+                    .setTrigger(Trigger.newBuilder().setAlways(Always.getDefaultInstance()))
+                    .setClosingBehavior(ClosingBehavior.Enum.EMIT_ALWAYS)
+                    .setOnTimeBehavior(OnTimeBehavior.Enum.FIRE_ALWAYS)
+                    .build())
+            .setTimerApiServiceDescriptor(ApiServiceDescriptor.newBuilder().setUrl("url").build())
+            .putCoders("string_coder", CoderTranslation.toProto(StringUtf8Coder.of()).getCoder())
+            .putCoders(
+                "2L-output-coder",
+                Coder.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build())
+                    .addComponentCoderIds("string_coder")
+                    .addComponentCoderIds("string_coder")
+                    .build())
+            .putCoders(
+                "window-strategy-coder",
+                Coder.newBuilder()
+                    .setSpec(
+                        FunctionSpec.newBuilder()
+                            .setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+                            .build())
+                    .build())
+            .putCoders(
+                "timer-coder",
+                Coder.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.TIMER_CODER_URN))
+                    .addComponentCoderIds("string_coder")
+                    .addComponentCoderIds("window-strategy-coder")
+                    .build())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+
+    Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap =
+        Maps.newHashMap(REGISTERED_RUNNER_FACTORIES);
+    urnToPTransformRunnerFactoryMap.put(
+        DATA_INPUT_URN,
+        (PTransformRunnerFactory<Object>)
+            (context) -> {
+              context.addIncomingDataEndpoint(
+                  ApiServiceDescriptor.getDefaultInstance(),
+                  KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()),
+                  (input) -> {
+                    SimpleRecordingDoFn.consumedData.add(input.getValue());
+                  });
+              return null;
+            });
+
+    Mockito.doAnswer(
+            (invocation) -> {
+              // A no op consumer for timers.
+              return new CloseableFnDataReceiver() {
+                @Override
+                public void accept(Object input) throws Exception {}
+
+                @Override
+                public void flush() throws Exception {}
+
+                @Override
+                public void close() throws Exception {}
+              };
+            })
+        .when(beamFnDataClient)
+        .send(any(), any(), any());
+
+    return new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        Collections.emptySet(),
+        fnApiRegistry::get,
+        beamFnDataClient,
+        null /* beamFnStateClient */,
+        null /* finalizeBundleHandler */,
+        new ShortIdMap(),
+        urnToPTransformRunnerFactoryMap,
+        new BundleProcessorCache());
+  }
+
+  @Test
+  public void testInstructionEmbeddedElementsAreProcessed() throws Exception {
+    ProcessBundleHandler handler = setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+    ByteString.Output encodedData = ByteString.newOutput();
+    KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
+    ByteString.Output encodedTimer = ByteString.newOutput();
+    Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+        .encode(
+            Timer.of(
+                "",
+                "timer_id",
+                Collections.singletonList(GlobalWindow.INSTANCE),
+                Instant.ofEpochMilli(1L),
+                Instant.ofEpochMilli(1L),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING),
+            encodedTimer);
+
+    handler.processBundle(
+        InstructionRequest.newBuilder()
+            .setInstructionId("998L")
+            .setProcessBundle(
+                ProcessBundleRequest.newBuilder()
+                    .setProcessBundleDescriptorId("1L")
+                    .setElements(
+                        Elements.newBuilder()
+                            .addData(
+                                Data.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("2L")
+                                    .setData(encodedData.toByteString())
+                                    .build())
+                            .addData(
+                                Data.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("2L")
+                                    .setIsLast(true)
+                                    .build())
+                            .addTimers(
+                                Timers.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("3L")
+                                    .setTimerFamilyId(
+                                        TimerFamilyDeclaration.PREFIX
+                                            + SimpleRecordingDoFn.TIMER_FAMILY_ID)
+                                    .setTimers(encodedTimer.toByteString())
+                                    .build())
+                            .addTimers(
+                                Timers.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("3L")
+                                    .setTimerFamilyId(
+                                        TimerFamilyDeclaration.PREFIX
+                                            + SimpleRecordingDoFn.TIMER_FAMILY_ID)
+                                    .setIsLast(true)
+                                    .build())
+                            .build()))
+            .build());
+    handler.shutdown();
+    assertThat(SimpleRecordingDoFn.consumedData, contains("data"));
+    assertThat(SimpleRecordingDoFn.firedOnTimerCallbackTimerIds, contains("timer_id"));
+    // Register timer family outbound receiver.
+    verify(beamFnDataClient).send(any(), any(), any());
+    verifyNoMoreInteractions(beamFnDataClient);
+  }
+
+  @Test
+  public void testInstructionEmbeddedElementsWithMalformedData() throws Exception {
+    ProcessBundleHandler handler = setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+    ByteString.Output encodedData = ByteString.newOutput();
+    KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
+
+    assertThrows(
+        "Expect java.lang.IllegalStateException: Unable to find inbound data receiver for"
+            + " instruction 998L and transform 3L. But was not thrown or Exception did not match.",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                InstructionRequest.newBuilder()
+                    .setInstructionId("998L")
+                    .setProcessBundle(
+                        ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L")
+                            .setElements(
+                                Elements.newBuilder()
+                                    .addData(
+                                        Data.newBuilder()
+                                            .setInstructionId("998L")
+                                            .setTransformId("3L")
+                                            .setData(encodedData.toByteString())
+                                            .build())
+                                    .build()))
+                    .build()));
+    assertThrows(
+        "Expect java.lang.RuntimeException: Elements embedded in ProcessBundleRequest are "
+            + "incomplete. But was not thrown or Exception did not match.",
+        RuntimeException.class,
+        () ->
+            handler.processBundle(
+                InstructionRequest.newBuilder()
+                    .setInstructionId("998L")
+                    .setProcessBundle(
+                        ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L")
+                            .setElements(
+                                Elements.newBuilder()
+                                    .addData(
+                                        Data.newBuilder()
+                                            .setInstructionId("998L")
+                                            .setTransformId("2L")
+                                            .setData(encodedData.toByteString())
+                                            .build())
+                                    .build()))
+                    .build()));
+    handler.shutdown();
+  }
+
+  @Test
+  public void testInstructionEmbeddedElementsWithMalformedTimers() throws Exception {
+    ProcessBundleHandler handler = setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+    ByteString.Output encodedTimer = ByteString.newOutput();
+    Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+        .encode(
+            Timer.of(
+                "",
+                "timer_id",
+                Collections.singletonList(GlobalWindow.INSTANCE),
+                Instant.ofEpochMilli(1L),
+                Instant.ofEpochMilli(1L),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING),
+            encodedTimer);
+
+    assertThrows(
+        "Expect java.lang.IllegalStateException: Unable to find inbound timer receiver "
+            + "for instruction 998L, transform 4L, and timer family tfs-timer_family. But was not"
+            + " thrown or Exception did not match.",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                InstructionRequest.newBuilder()
+                    .setInstructionId("998L")
+                    .setProcessBundle(
+                        ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L")
+                            .setElements(
+                                Elements.newBuilder()
+                                    .addTimers(
+                                        Timers.newBuilder()
+                                            .setInstructionId("998L")
+                                            .setTransformId("4L")
+                                            .setTimerFamilyId(
+                                                TimerFamilyDeclaration.PREFIX
+                                                    + SimpleRecordingDoFn.TIMER_FAMILY_ID)
+                                            .setTimers(encodedTimer.toByteString())
+                                            .build())
+                                    .build()))
+                    .build()));
+    assertThrows(
+        "Expect java.lang.IllegalStateException: Unable to find inbound timer receiver "

Review comment:
       "But was not thrown or Exception did not match" ???

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -790,6 +811,367 @@ public void testPTransformStartExceptionsArePropagated() {
     assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), empty());
   }
 
+  private static final class SimpleRecordingDoFn extends DoFn<KV<String, String>, String> {
+    private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<>("mainOutput");
+    private static final String TIMER_FAMILY_ID = "timer_family";
+
+    @TimerFamily(TIMER_FAMILY_ID)
+    private final TimerSpec timer = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    static List<String> consumedData = new ArrayList<>();

Review comment:
       It would be best if you could forward the inputs as outputs and validate your tests that way instead of relying on statics that can cause tests to enter a bad state if things don't go as expected.
   

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -790,6 +811,367 @@ public void testPTransformStartExceptionsArePropagated() {
     assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), empty());
   }
 
+  private static final class SimpleRecordingDoFn extends DoFn<KV<String, String>, String> {
+    private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<>("mainOutput");
+    private static final String TIMER_FAMILY_ID = "timer_family";
+
+    @TimerFamily(TIMER_FAMILY_ID)
+    private final TimerSpec timer = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    static List<String> consumedData = new ArrayList<>();
+    static List<String> firedOnTimerCallbackTimerIds = new ArrayList<>();
+
+    @ProcessElement
+    public void processElement(ProcessContext context, BoundedWindow window) {}
+
+    @OnTimerFamily(TIMER_FAMILY_ID)
+    public void onTimer(@TimerId String timerId) {
+      firedOnTimerCallbackTimerIds.add(timerId);
+    }
+  }
+
+  private ProcessBundleHandler setupProcessBundleHanlderForSimpleRecordingDoFn() throws Exception {
+    SimpleRecordingDoFn.consumedData.clear();
+    SimpleRecordingDoFn.firedOnTimerCallbackTimerIds.clear();
+    DoFnWithExecutionInformation doFnWithExecutionInformation =
+        DoFnWithExecutionInformation.of(
+            new SimpleRecordingDoFn(),
+            SimpleRecordingDoFn.MAIN_OUTPUT_TAG,
+            Collections.emptyMap(),
+            DoFnSchemaInformation.create());
+    RunnerApi.FunctionSpec functionSpec =
+        RunnerApi.FunctionSpec.newBuilder()
+            .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
+            .setPayload(
+                ByteString.copyFrom(
+                    SerializableUtils.serializeToByteArray(doFnWithExecutionInformation)))
+            .build();
+    RunnerApi.ParDoPayload parDoPayload =
+        ParDoPayload.newBuilder()
+            .setDoFn(functionSpec)
+            .putTimerFamilySpecs(
+                "tfs-" + SimpleRecordingDoFn.TIMER_FAMILY_ID,
+                TimerFamilySpec.newBuilder()
+                    .setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME)
+                    .setTimerFamilyCoderId("timer-coder")
+                    .build())
+            .build();
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        ProcessBundleDescriptor.newBuilder()
+            .putTransforms(
+                "2L",
+                PTransform.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                    .putOutputs("2L-output", "2L-output-pc")
+                    .build())
+            .putTransforms(
+                "3L",
+                PTransform.newBuilder()
+                    .setSpec(
+                        FunctionSpec.newBuilder()
+                            .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                            .setPayload(parDoPayload.toByteString()))
+                    .putInputs("3L-input", "2L-output-pc")
+                    .build())
+            .putPcollections(
+                "2L-output-pc",
+                PCollection.newBuilder()
+                    .setWindowingStrategyId("window-strategy")
+                    .setCoderId("2L-output-coder")
+                    .build())
+            .putWindowingStrategies(
+                "window-strategy",
+                WindowingStrategy.newBuilder()
+                    .setWindowCoderId("window-strategy-coder")
+                    .setWindowFn(
+                        FunctionSpec.newBuilder().setUrn("beam:window_fn:global_windows:v1"))
+                    .setOutputTime(OutputTime.Enum.END_OF_WINDOW)
+                    .setAccumulationMode(AccumulationMode.Enum.ACCUMULATING)
+                    .setTrigger(Trigger.newBuilder().setAlways(Always.getDefaultInstance()))
+                    .setClosingBehavior(ClosingBehavior.Enum.EMIT_ALWAYS)
+                    .setOnTimeBehavior(OnTimeBehavior.Enum.FIRE_ALWAYS)
+                    .build())
+            .setTimerApiServiceDescriptor(ApiServiceDescriptor.newBuilder().setUrl("url").build())
+            .putCoders("string_coder", CoderTranslation.toProto(StringUtf8Coder.of()).getCoder())
+            .putCoders(
+                "2L-output-coder",
+                Coder.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build())
+                    .addComponentCoderIds("string_coder")
+                    .addComponentCoderIds("string_coder")
+                    .build())
+            .putCoders(
+                "window-strategy-coder",
+                Coder.newBuilder()
+                    .setSpec(
+                        FunctionSpec.newBuilder()
+                            .setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+                            .build())
+                    .build())
+            .putCoders(
+                "timer-coder",
+                Coder.newBuilder()
+                    .setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.TIMER_CODER_URN))
+                    .addComponentCoderIds("string_coder")
+                    .addComponentCoderIds("window-strategy-coder")
+                    .build())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+
+    Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap =
+        Maps.newHashMap(REGISTERED_RUNNER_FACTORIES);
+    urnToPTransformRunnerFactoryMap.put(
+        DATA_INPUT_URN,
+        (PTransformRunnerFactory<Object>)
+            (context) -> {
+              context.addIncomingDataEndpoint(
+                  ApiServiceDescriptor.getDefaultInstance(),
+                  KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()),
+                  (input) -> {
+                    SimpleRecordingDoFn.consumedData.add(input.getValue());
+                  });
+              return null;
+            });
+
+    Mockito.doAnswer(
+            (invocation) -> {
+              // A no op consumer for timers.
+              return new CloseableFnDataReceiver() {
+                @Override
+                public void accept(Object input) throws Exception {}
+
+                @Override
+                public void flush() throws Exception {}
+
+                @Override
+                public void close() throws Exception {}
+              };
+            })
+        .when(beamFnDataClient)
+        .send(any(), any(), any());
+
+    return new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        Collections.emptySet(),
+        fnApiRegistry::get,
+        beamFnDataClient,
+        null /* beamFnStateClient */,
+        null /* finalizeBundleHandler */,
+        new ShortIdMap(),
+        urnToPTransformRunnerFactoryMap,
+        new BundleProcessorCache());
+  }
+
+  @Test
+  public void testInstructionEmbeddedElementsAreProcessed() throws Exception {
+    ProcessBundleHandler handler = setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+    ByteString.Output encodedData = ByteString.newOutput();
+    KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
+    ByteString.Output encodedTimer = ByteString.newOutput();
+    Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+        .encode(
+            Timer.of(
+                "",
+                "timer_id",
+                Collections.singletonList(GlobalWindow.INSTANCE),
+                Instant.ofEpochMilli(1L),
+                Instant.ofEpochMilli(1L),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING),
+            encodedTimer);
+
+    handler.processBundle(
+        InstructionRequest.newBuilder()
+            .setInstructionId("998L")
+            .setProcessBundle(
+                ProcessBundleRequest.newBuilder()
+                    .setProcessBundleDescriptorId("1L")
+                    .setElements(
+                        Elements.newBuilder()
+                            .addData(
+                                Data.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("2L")
+                                    .setData(encodedData.toByteString())
+                                    .build())
+                            .addData(
+                                Data.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("2L")
+                                    .setIsLast(true)
+                                    .build())
+                            .addTimers(
+                                Timers.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("3L")
+                                    .setTimerFamilyId(
+                                        TimerFamilyDeclaration.PREFIX
+                                            + SimpleRecordingDoFn.TIMER_FAMILY_ID)
+                                    .setTimers(encodedTimer.toByteString())
+                                    .build())
+                            .addTimers(
+                                Timers.newBuilder()
+                                    .setInstructionId("998L")
+                                    .setTransformId("3L")
+                                    .setTimerFamilyId(
+                                        TimerFamilyDeclaration.PREFIX
+                                            + SimpleRecordingDoFn.TIMER_FAMILY_ID)
+                                    .setIsLast(true)
+                                    .build())
+                            .build()))
+            .build());
+    handler.shutdown();
+    assertThat(SimpleRecordingDoFn.consumedData, contains("data"));
+    assertThat(SimpleRecordingDoFn.firedOnTimerCallbackTimerIds, contains("timer_id"));
+    // Register timer family outbound receiver.
+    verify(beamFnDataClient).send(any(), any(), any());
+    verifyNoMoreInteractions(beamFnDataClient);
+  }
+
+  @Test
+  public void testInstructionEmbeddedElementsWithMalformedData() throws Exception {
+    ProcessBundleHandler handler = setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+    ByteString.Output encodedData = ByteString.newOutput();
+    KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
+
+    assertThrows(
+        "Expect java.lang.IllegalStateException: Unable to find inbound data receiver for"
+            + " instruction 998L and transform 3L. But was not thrown or Exception did not match.",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                InstructionRequest.newBuilder()
+                    .setInstructionId("998L")
+                    .setProcessBundle(
+                        ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L")
+                            .setElements(
+                                Elements.newBuilder()
+                                    .addData(
+                                        Data.newBuilder()
+                                            .setInstructionId("998L")
+                                            .setTransformId("3L")
+                                            .setData(encodedData.toByteString())
+                                            .build())
+                                    .build()))
+                    .build()));
+    assertThrows(
+        "Expect java.lang.RuntimeException: Elements embedded in ProcessBundleRequest are "
+            + "incomplete. But was not thrown or Exception did not match.",
+        RuntimeException.class,
+        () ->
+            handler.processBundle(
+                InstructionRequest.newBuilder()
+                    .setInstructionId("998L")
+                    .setProcessBundle(
+                        ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L")
+                            .setElements(
+                                Elements.newBuilder()
+                                    .addData(
+                                        Data.newBuilder()
+                                            .setInstructionId("998L")
+                                            .setTransformId("2L")
+                                            .setData(encodedData.toByteString())
+                                            .build())
+                                    .build()))
+                    .build()));
+    handler.shutdown();
+  }
+
+  @Test
+  public void testInstructionEmbeddedElementsWithMalformedTimers() throws Exception {
+    ProcessBundleHandler handler = setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+    ByteString.Output encodedTimer = ByteString.newOutput();
+    Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+        .encode(
+            Timer.of(
+                "",
+                "timer_id",
+                Collections.singletonList(GlobalWindow.INSTANCE),
+                Instant.ofEpochMilli(1L),
+                Instant.ofEpochMilli(1L),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING),
+            encodedTimer);
+
+    assertThrows(
+        "Expect java.lang.IllegalStateException: Unable to find inbound timer receiver "
+            + "for instruction 998L, transform 4L, and timer family tfs-timer_family. But was not"

Review comment:
       "But was not thrown or Exception did not match" ???




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16051: [BEAM-13193] Support StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_…

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16051:
URL: https://github.com/apache/beam/pull/16051#discussion_r758597801



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
##########
@@ -179,6 +118,69 @@ public void awaitCompletion() throws Exception {
     }
   }
 
+  /** Dispatches the data and timers from the elements to corresponding receivers. */
+  public void multiplexElements(Elements elements) throws Exception {

Review comment:
       The purpose was to ensure that a malformed message from the runner lead to an error instead of the bundle silently completing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16051: [BEAM-13193] Support StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_…

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16051:
URL: https://github.com/apache/beam/pull/16051#discussion_r755405431



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -458,7 +458,10 @@ public BundleFinalizer getBundleFinalizer() {
             startFunction.run();
           }
 
-          if (!bundleProcessor.getInboundEndpointApiServiceDescriptors().isEmpty()) {
+          if (request.getProcessBundle().hasElements()) {
+            bundleProcessor.getInboundObserver().accept(request.getProcessBundle().getElements());
+            bundleProcessor.getInboundObserver().awaitCompletion();

Review comment:
       This will block forever if the elements message is malformed and doesn't contain "terminal" elements so it would be better to handle this case explicitly within the BeamFnDataInboundObserver2 as a separate method.
   
   Also passing it through the queue within BeamFnDataInboundObserver2 and adding the additional synchronization seems wasteful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #16051: [BEAM-13193] Support StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_…

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #16051:
URL: https://github.com/apache/beam/pull/16051#issuecomment-977306855


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16051: [BEAM-13193] Support StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_…

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16051:
URL: https://github.com/apache/beam/pull/16051#issuecomment-978422705


   R: @lukecwik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org