You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/02/07 22:26:55 UTC

[GitHub] [beam] y1chi opened a new pull request #16769: [BEAM-13193] Enable process bundle response elements embedding in Jav…

y1chi opened a new pull request #16769:
URL: https://github.com/apache/beam/pull/16769


   …a SDK Harness
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_PythonUsingJava_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_PythonUsingJava_Dataflow/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_PythonUsingJavaSQL_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_PythonUsingJavaSQL_Dataflow/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_JavaUsingPython_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_JavaUsingPython_Dataflow/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #16769: [BEAM-13193] Enable process bundle response elements embedding in Jav…

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #16769:
URL: https://github.com/apache/beam/pull/16769#issuecomment-1032008935


   R: @lukecwik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16769: [BEAM-13193] Enable process bundle response elements embedding in Jav…

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16769:
URL: https://github.com/apache/beam/pull/16769#discussion_r801296671



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -199,7 +210,14 @@ public void sendBufferedDataAndFinishOutboundStreams() {
           .setIsLast(true);
       entry.getValue().resetStats();
     }
+    if (collectElementsIfNoFlushes && !hasFlushedForBundle) {
+      return bufferedElements.build();
+    }
     outboundObserver.onNext(bufferedElements.build());
+    // This is now at the end of a bundle, so we reset hasFlushedForBundle to prepare for new
+    // bundles.
+    hasFlushedForBundle = false;

Review comment:
       I see now what you're saying. Can you add `@NotThreadSafe` annotation at the class level and add this comment above it:
   
   The calling thread that invokes `sendBufferedDataAndFinishOutboundStreams` synchronizes on `flushLock` effectively making the periodic flushing no longer read or mutate `hasFlushedForBundle` and allowing the calling thread to read and mutate `hasFlushedForBundle` safely without needing to create another memory barrier. Also note that `flush` is always invoked when synchronizing on `flushLock` when there is a periodic flushing thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #16769: [BEAM-13193] Enable process bundle response elements embedding in Jav…

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #16769:
URL: https://github.com/apache/beam/pull/16769#discussion_r801207453



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -199,7 +210,14 @@ public void sendBufferedDataAndFinishOutboundStreams() {
           .setIsLast(true);
       entry.getValue().resetStats();
     }
+    if (collectElementsIfNoFlushes && !hasFlushedForBundle) {
+      return bufferedElements.build();
+    }
     outboundObserver.onNext(bufferedElements.build());
+    // This is now at the end of a bundle, so we reset hasFlushedForBundle to prepare for new
+    // bundles.
+    hasFlushedForBundle = false;

Review comment:
       the convertBufferForTransmission implicitly turns the flushThread to a no-op thread, so there should only be only one thread accessing hasFlushedForBundle.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #16769: [BEAM-13193] Enable process bundle response elements embedding in Jav…

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #16769:
URL: https://github.com/apache/beam/pull/16769#issuecomment-1033019435


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #16769: [BEAM-13193] Enable process bundle response elements embedding in Jav…

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #16769:
URL: https://github.com/apache/beam/pull/16769#issuecomment-1032135886


   > Why would you need mutliple terminal deduplication? Each timer/data endpoint would have a unique descriptor.
   
   OK, I was not sure whether there's a case a single timer/data endpoint could be sent over multiple ApiServiceDescriptors. But looking again, I guess they'll form unique ProcessBundleDescriptors hence end up with different BundleHandle.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #16769: [BEAM-13193] Enable process bundle response elements embedding in Jav…

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #16769:
URL: https://github.com/apache/beam/pull/16769#discussion_r801206678



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -85,10 +87,17 @@ public BeamFnDataOutboundAggregator(
     this.processBundleRequestIdSupplier = processBundleRequestIdSupplier;
     this.bytesWrittenSinceFlush = 0L;
     this.flushLock = new Object();
+    this.hasFlushedForBundle = false;
   }
 
-  /** Starts the flushing daemon thread if data_buffer_time_limit_ms is set. */
-  public void startFlushThread() {
+  /**
+   * Starts the flushing daemon thread if data_buffer_time_limit_ms is set. If
+   * collectElementsIfNoFlushes is true, {@link
+   * #sendOrCollectBufferedDataAndFinishOutboundStreams()} returns the buffered data instead of
+   * sending it to the outboundObserver if no flushes have taken place within this bundle
+   */
+  public void start(boolean collectElementsIfNoFlushes) {
+    this.collectElementsIfNoFlushes = collectElementsIfNoFlushes;

Review comment:
       Sounds good to replay, I've updated accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16769: [BEAM-13193] Enable process bundle response elements embedding in Jav…

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16769:
URL: https://github.com/apache/beam/pull/16769#issuecomment-1032017270


   > 
   
   Why would you need mutliple terminal deduplication? Each timer/data endpoint would have a unique descriptor.
   
   Agree on the single case that you can defer it for now but you wouldn't need additional synchronization since you can gather all elements first and embed then in the response if each aggregator didn't return null otherwise pass the ones you got out back to the aggregator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #16769: [BEAM-13193] Enable process bundle response elements embedding in Jav…

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #16769:
URL: https://github.com/apache/beam/pull/16769#issuecomment-1032008781


   This embedding is enabled if there's only one BeamFnOutboundAggregator for the bundle(data/timers ApiServiceDescriptor is single constant which seems to be the case at the moment) since it requires additional synchronization (and terminal element deduplication?) if we have multiple data/timers ApiServiceDescriptors which seems not worthy. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16769: [BEAM-13193] Enable process bundle response elements embedding in Jav…

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16769:
URL: https://github.com/apache/beam/pull/16769#discussion_r801123126



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -199,7 +210,14 @@ public void sendBufferedDataAndFinishOutboundStreams() {
           .setIsLast(true);
       entry.getValue().resetStats();
     }
+    if (collectElementsIfNoFlushes && !hasFlushedForBundle) {
+      return bufferedElements.build();
+    }
     outboundObserver.onNext(bufferedElements.build());
+    // This is now at the end of a bundle, so we reset hasFlushedForBundle to prepare for new
+    // bundles.
+    hasFlushedForBundle = false;

Review comment:
       `hasFlushedForBundle` is outside of the `flushLock` and is being read/written by different threads.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -986,7 +1002,13 @@ void finish() {
       inboundObserver2 =
           BeamFnDataInboundObserver2.forConsumers(getInboundDataEndpoints(), getTimerEndpoints());
       for (BeamFnDataOutboundAggregator aggregator : getOutboundAggregators().values()) {
-        aggregator.startFlushThread();
+        aggregator.start(
+            getRunnerCapabilities()
+                    .contains(
+                        BeamUrns.getUrn(
+                            StandardRunnerProtocols.Enum.CONTROL_RESPONSE_ELEMENTS_EMBEDDING))
+                // TODO: Handle multiple outbound ApiServiceDescriptors.

Review comment:
       ```suggestion
                   // TODO(BEAM-13193): Handle multiple outbound ApiServiceDescriptors.
   ```

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -85,10 +87,17 @@ public BeamFnDataOutboundAggregator(
     this.processBundleRequestIdSupplier = processBundleRequestIdSupplier;
     this.bytesWrittenSinceFlush = 0L;
     this.flushLock = new Object();
+    this.hasFlushedForBundle = false;
   }
 
-  /** Starts the flushing daemon thread if data_buffer_time_limit_ms is set. */
-  public void startFlushThread() {
+  /**
+   * Starts the flushing daemon thread if data_buffer_time_limit_ms is set. If
+   * collectElementsIfNoFlushes is true, {@link
+   * #sendOrCollectBufferedDataAndFinishOutboundStreams()} returns the buffered data instead of
+   * sending it to the outboundObserver if no flushes have taken place within this bundle
+   */
+  public void start(boolean collectElementsIfNoFlushes) {
+    this.collectElementsIfNoFlushes = collectElementsIfNoFlushes;

Review comment:
       We should know that at construction time since we should know the runner capability before we even start processing a bundle allowing for `collectElementsIfNoFlushes` to be final but this would be difficult with out solving the multiple aggregator case which doesn't seem challenging to capture the results for each aggregator and either embed them in the response if you captured them all or replay them back to the aggregator if one of them returned null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi merged pull request #16769: [BEAM-13193] Enable process bundle response elements embedding in Jav…

Posted by GitBox <gi...@apache.org>.
y1chi merged pull request #16769:
URL: https://github.com/apache/beam/pull/16769


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16769: [BEAM-13193] Enable process bundle response elements embedding in Jav…

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16769:
URL: https://github.com/apache/beam/pull/16769#discussion_r801296671



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -199,7 +210,14 @@ public void sendBufferedDataAndFinishOutboundStreams() {
           .setIsLast(true);
       entry.getValue().resetStats();
     }
+    if (collectElementsIfNoFlushes && !hasFlushedForBundle) {
+      return bufferedElements.build();
+    }
     outboundObserver.onNext(bufferedElements.build());
+    // This is now at the end of a bundle, so we reset hasFlushedForBundle to prepare for new
+    // bundles.
+    hasFlushedForBundle = false;

Review comment:
       I'm not following since sendOrCollectBufferedDataAndFinishOutboundStreams() is invoked by the bundle processing thread and flush is invoked by the periodic flush thread.,x




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16769: [BEAM-13193] Enable process bundle response elements embedding in Jav…

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16769:
URL: https://github.com/apache/beam/pull/16769#discussion_r801863810



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
##########
@@ -157,15 +161,18 @@ public void flush() throws IOException {
     if (elements.getDataCount() > 0 || elements.getTimersCount() > 0) {

Review comment:
       Lets make flush private and/or package private marked with `@VisibleForTesting` if used in tests.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -555,6 +575,38 @@ public BundleFinalizer getBundleFinalizer() {
     }
   }
 
+  private void embedOutboundElementsIfApplicable(
+      ProcessBundleResponse.Builder response, BundleProcessor bundleProcessor) {
+    List<Elements> collectedElements =
+        new ArrayList<>(bundleProcessor.getOutboundAggregators().size());
+    boolean hasFlushedAggregator = false;
+    for (BeamFnDataOutboundAggregator aggregator :

Review comment:
       Add a comment by the instantiation of the outbound aggregator map that your using a LinkedHashMap since you rely on stable iteration order.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -555,6 +575,38 @@ public BundleFinalizer getBundleFinalizer() {
     }
   }
 
+  private void embedOutboundElementsIfApplicable(
+      ProcessBundleResponse.Builder response, BundleProcessor bundleProcessor) {
+    List<Elements> collectedElements =
+        new ArrayList<>(bundleProcessor.getOutboundAggregators().size());
+    boolean hasFlushedAggregator = false;
+    for (BeamFnDataOutboundAggregator aggregator :
+        bundleProcessor.getOutboundAggregators().values()) {
+      Elements elements = aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
+      if (elements == null) {
+        hasFlushedAggregator = true;
+      }
+      collectedElements.add(elements);
+    }
+    if (!hasFlushedAggregator && !collectedElements.isEmpty()) {
+      Elements.Builder elementsToEmbed = Elements.newBuilder();
+      for (Elements collectedElement : collectedElements) {
+        elementsToEmbed.mergeFrom(collectedElement);
+      }
+      response.setElements(elementsToEmbed.build());
+    } else {
+      // If there's flushed aggregator, flush all other aggregators as well.

Review comment:
       ```suggestion
         // Since there was at least one flushed aggregator, we have to use the aggregators that were able to successfully collect their elements to emit them and can not send them as part of the ProcessBundleResponse.
   ```

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -555,6 +575,38 @@ public BundleFinalizer getBundleFinalizer() {
     }
   }
 
+  private void embedOutboundElementsIfApplicable(
+      ProcessBundleResponse.Builder response, BundleProcessor bundleProcessor) {
+    List<Elements> collectedElements =
+        new ArrayList<>(bundleProcessor.getOutboundAggregators().size());
+    boolean hasFlushedAggregator = false;
+    for (BeamFnDataOutboundAggregator aggregator :
+        bundleProcessor.getOutboundAggregators().values()) {
+      Elements elements = aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
+      if (elements == null) {
+        hasFlushedAggregator = true;
+      }
+      collectedElements.add(elements);
+    }
+    if (!hasFlushedAggregator && !collectedElements.isEmpty()) {

Review comment:
       Move the `!collectedElements.isEmpty()` logic to the start of this method with:
   `if (bundleProcessor.getOutboundAggregators().isEmpty()) { return; }`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org