You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/02/18 19:20:52 UTC

[GitHub] [beam] dpcollins-google opened a new pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

dpcollins-google opened a new pull request #16901:
URL: https://github.com/apache/beam/pull/16901


   Currently, because the queue is only limited by number of elements, there can be up to (num threads + queue size) elements outstanding at a time, which for large work items will almost certainly OOM the worker.
   
   This change both makes this limit explicit and adds a 500 MB memory limit on outstanding WorkItems to push back on windmill before workers run out of memory.
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_PythonUsingJava_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_PythonUsingJava_Dataflow/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_PythonUsingJavaSQL_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_PythonUsingJavaSQL_Dataflow/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_JavaUsingPython_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_JavaUsingPython_Dataflow/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r813206968



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -785,6 +779,18 @@ private int chooseMaximumNumberOfThreads() {
     return MAX_PROCESSING_THREADS;
   }
 
+  private int chooseMaximumBundlesOutstanding() {
+    return Math.max(options.getMaxBundlesFromWindmillOutstanding(), chooseMaximumNumberOfThreads());

Review comment:
       Previous was 100 + MAX_PROCESSING_THREADS (the constant 300) and didn't take into account the number of actual processing threads.
   
   This keeps the identical behavior.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles merged pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
kennknowles merged pull request #16901:
URL: https://github.com/apache/beam/pull/16901


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r812332825



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -195,6 +195,8 @@
   // retrieving extra work from Windmill without working on it, leading to better
   // prioritization / utilization.
   static final int MAX_WORK_UNITS_QUEUED = 100;
+  // Maximum bytes of WorkItems being processed in the work queue at a time.
+  static final int MAX_WORK_UNITS_BYTES = 500 << 20; // 500MB

Review comment:
       Still, making it a pipelineoption will allow much more agile adjustments by the service.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #16901:
URL: https://github.com/apache/beam/pull/16901#issuecomment-1045056126


   R: @kennknowles 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r812324231



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -195,6 +195,8 @@
   // retrieving extra work from Windmill without working on it, leading to better
   // prioritization / utilization.
   static final int MAX_WORK_UNITS_QUEUED = 100;
+  // Maximum bytes of WorkItems being processed in the work queue at a time.
+  static final int MAX_WORK_UNITS_BYTES = 500 << 20; // 500MB

Review comment:
       This would only throttle pipelines that are successfully operating with >500 MB outstanding at a time and recovering from this scenario faster than windmill could deliver new data to the worker. I'd expect this to be a very, very small percentage of pipelines, effectively only those whose work items were consistently about (NON_OOM_USABLE_MEMORY / 400) in size, which would be rare for them to be so precisely tuned to that, but also rare that they would be able to process 500 MB of data before windmill could deliver them more.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] scwhittle commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
scwhittle commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r812672510



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -195,6 +195,8 @@
   // retrieving extra work from Windmill without working on it, leading to better
   // prioritization / utilization.
   static final int MAX_WORK_UNITS_QUEUED = 100;
+  // Maximum bytes of WorkItems being processed in the work queue at a time.
+  static final int MAX_WORK_UNITS_BYTES = 500 << 20; // 500MB

Review comment:
       The argument that pipelines would always OOM if they exceeded 500MB active breaks down due to pipelines with limited parallelism. Streaming dataflow does not return additional work for keys that are already active on the worker.
   
   For example, if you have a pipeline writing to files with fixed shards and there are only 8 shards per worker, the current queued/active is 800MB (due to 100MB work bundle limit).  Since there is by default ~4GB and configurably a lot more memory for the java harness such pipelines would not be ooming.
   
   So I'm still concerned that setting the default to 500MB could limit pipelines that previously worked.  Now that it is an option it is at least something we can tune but it still seems like it could cause issues and should perhaps be scaled based upon machine memory.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #16901:
URL: https://github.com/apache/beam/pull/16901#issuecomment-1048002925


   I want to at least CC @scwhittle on this while I review it. I can check basic things but I don't have a sense of the perf impact.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r813149643



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -195,6 +195,8 @@
   // retrieving extra work from Windmill without working on it, leading to better
   // prioritization / utilization.
   static final int MAX_WORK_UNITS_QUEUED = 100;
+  // Maximum bytes of WorkItems being processed in the work queue at a time.
+  static final int MAX_WORK_UNITS_BYTES = 500 << 20; // 500MB

Review comment:
       What would your preferred default limit be? The default memory limit for dataflow streaming engine workers is 8G (n1-standard-2), would you prefer a 2G or 4G default for such machines?
   
   If you'd prefer this to be scaled on available jvm memory, what would the preferred default fraction be?
   
   > For elements we have separate limits for the queue and for active (since there are a limited # of active threads each processing one work item).
   
   We don't really. We have a number of threads and a number of queue slots, so threads + queue slots = outstanding. With the default of 100 work threads, if we didnt' limit active memory, we would still ahve this problem.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r813218264



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
##########
@@ -18,62 +18,126 @@
 package org.apache.beam.runners.dataflow.worker.util;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor.Guard;
 
-/** Executor that blocks on execute() if its queue is full. */
+/** An executor for executing work on windmill items. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BoundedQueueExecutor extends ThreadPoolExecutor {
-  private static class ReducableSemaphore extends Semaphore {
-    ReducableSemaphore(int permits) {
-      super(permits);
-    }
+public class BoundedQueueExecutor {
+  private final ThreadPoolExecutor executor;
+  private final int maximumElementsOutstanding;
+  private final long maximumBytesOutstanding;
 
-    @Override
-    public void reducePermits(int permits) {
-      super.reducePermits(permits);
-    }
-  }
-
-  private ReducableSemaphore semaphore;
+  private final Monitor monitor = new Monitor();
+  private int elementsOutstanding = 0;
+  private long bytesOutstanding = 0;
 
   public BoundedQueueExecutor(
       int maximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
-      int maximumQueueSize,
+      int maximumElementsOutstanding,
+      long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
-    super(
-        maximumPoolSize,
-        maximumPoolSize,
-        keepAliveTime,
-        unit,
-        new LinkedBlockingQueue<Runnable>(),
-        threadFactory);
-    this.semaphore = new ReducableSemaphore(maximumQueueSize);
-    allowCoreThreadTimeOut(true);
+    executor =
+        new ThreadPoolExecutor(
+            maximumPoolSize,
+            maximumPoolSize,
+            keepAliveTime,
+            unit,
+            new LinkedBlockingQueue<>(),
+            threadFactory);
+    executor.allowCoreThreadTimeOut(true);
+    this.maximumElementsOutstanding = maximumElementsOutstanding;
+    this.maximumBytesOutstanding = maximumBytesOutstanding;
   }
 
-  // Before adding a Runnable to the queue, acquire the semaphore.
-  @Override
-  public void execute(Runnable r) {
-    semaphore.acquireUninterruptibly();
-    super.execute(r);
+  // Before adding a Work to the queue, check that there are enough bytes of space or no other
+  // outstanding elements of work.
+  public void execute(Runnable work, long workBytes) {
+    monitor.enterWhenUninterruptibly(
+        new Guard(monitor) {
+          @Override
+          public boolean isSatisfied() {
+            return elementsOutstanding == 0
+                || (bytesAvailable() >= workBytes
+                    && elementsOutstanding < maximumElementsOutstanding);
+          }
+        });
+    Runnable workRunnable =
+        () -> {
+          try {
+            work.run();
+          } finally {
+            monitor.enter();
+            --elementsOutstanding;
+            bytesOutstanding -= workBytes;
+            monitor.leave();
+          }
+        };
+    try {
+      bytesOutstanding += workBytes;
+      ++elementsOutstanding;
+      executor.execute(workRunnable);
+    } finally {
+      monitor.leave();
+    }
   }
 
   // Forcibly add something to the queue, ignoring the length limit.
   public void forceExecute(Runnable r) {
-    semaphore.reducePermits(1);
-    super.execute(r);
+    executor.execute(r);

Review comment:
       This is now enforced.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r814029376



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
##########
@@ -18,62 +18,135 @@
 package org.apache.beam.runners.dataflow.worker.util;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor.Guard;
 
-/** Executor that blocks on execute() if its queue is full. */
+/** An executor for executing work on windmill items. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BoundedQueueExecutor extends ThreadPoolExecutor {
-  private static class ReducableSemaphore extends Semaphore {
-    ReducableSemaphore(int permits) {
-      super(permits);
-    }
-
-    @Override
-    public void reducePermits(int permits) {
-      super.reducePermits(permits);
-    }
-  }
+public class BoundedQueueExecutor {
+  private final ThreadPoolExecutor executor;
+  private final int maximumElementsOutstanding;
+  private final long maximumBytesOutstanding;
 
-  private ReducableSemaphore semaphore;
+  private final Monitor monitor = new Monitor();
+  private int elementsOutstanding = 0;
+  private long bytesOutstanding = 0;
 
   public BoundedQueueExecutor(
       int maximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
-      int maximumQueueSize,
+      int maximumElementsOutstanding,
+      long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
-    super(
-        maximumPoolSize,
-        maximumPoolSize,
-        keepAliveTime,
-        unit,
-        new LinkedBlockingQueue<Runnable>(),
-        threadFactory);
-    this.semaphore = new ReducableSemaphore(maximumQueueSize);
-    allowCoreThreadTimeOut(true);
+    executor =
+        new ThreadPoolExecutor(
+            maximumPoolSize,
+            maximumPoolSize,
+            keepAliveTime,
+            unit,
+            new LinkedBlockingQueue<>(),
+            threadFactory);
+    executor.allowCoreThreadTimeOut(true);
+    this.maximumElementsOutstanding = maximumElementsOutstanding;
+    this.maximumBytesOutstanding = maximumBytesOutstanding;
+  }
+
+  // Before adding a Work to the queue, check that there are enough bytes of space or no other
+  // outstanding elements of work.
+  public void execute(Runnable work, long workBytes) {
+    monitor.enterWhenUninterruptibly(
+        new Guard(monitor) {
+          @Override
+          public boolean isSatisfied() {
+            return elementsOutstanding == 0
+                || (bytesAvailable() >= workBytes
+                    && elementsOutstanding < maximumElementsOutstanding);
+          }
+        });
+    executeLockHeld(work, workBytes);
   }
 
-  // Before adding a Runnable to the queue, acquire the semaphore.
-  @Override
-  public void execute(Runnable r) {
-    semaphore.acquireUninterruptibly();
-    super.execute(r);
+  public void forceExecute(Runnable work, long workBytes) {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #16901:
URL: https://github.com/apache/beam/pull/16901#issuecomment-1050132139


   nice improvement


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r812422275



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -195,6 +195,8 @@
   // retrieving extra work from Windmill without working on it, leading to better
   // prioritization / utilization.
   static final int MAX_WORK_UNITS_QUEUED = 100;
+  // Maximum bytes of WorkItems being processed in the work queue at a time.
+  static final int MAX_WORK_UNITS_BYTES = 500 << 20; // 500MB

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] scwhittle commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
scwhittle commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r812676393



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -195,6 +195,8 @@
   // retrieving extra work from Windmill without working on it, leading to better
   // prioritization / utilization.
   static final int MAX_WORK_UNITS_QUEUED = 100;
+  // Maximum bytes of WorkItems being processed in the work queue at a time.
+  static final int MAX_WORK_UNITS_BYTES = 500 << 20; // 500MB

Review comment:
       For elements we have separate limits for the queue and for active (since there are a limited # of active threads each processing one work item).
   
   Should we have the distinction between queued and active bytes? Queued bytes seems like it would be safer to have a default lower size, where active bytes might throttle pipelines that currently work.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r814031771



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
##########
@@ -18,62 +18,135 @@
 package org.apache.beam.runners.dataflow.worker.util;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor.Guard;
 
-/** Executor that blocks on execute() if its queue is full. */
+/** An executor for executing work on windmill items. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BoundedQueueExecutor extends ThreadPoolExecutor {
-  private static class ReducableSemaphore extends Semaphore {
-    ReducableSemaphore(int permits) {
-      super(permits);
-    }
-
-    @Override
-    public void reducePermits(int permits) {
-      super.reducePermits(permits);
-    }
-  }
+public class BoundedQueueExecutor {
+  private final ThreadPoolExecutor executor;
+  private final int maximumElementsOutstanding;
+  private final long maximumBytesOutstanding;
 
-  private ReducableSemaphore semaphore;
+  private final Monitor monitor = new Monitor();
+  private int elementsOutstanding = 0;
+  private long bytesOutstanding = 0;
 
   public BoundedQueueExecutor(
       int maximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
-      int maximumQueueSize,
+      int maximumElementsOutstanding,
+      long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
-    super(
-        maximumPoolSize,
-        maximumPoolSize,
-        keepAliveTime,
-        unit,
-        new LinkedBlockingQueue<Runnable>(),
-        threadFactory);
-    this.semaphore = new ReducableSemaphore(maximumQueueSize);
-    allowCoreThreadTimeOut(true);
+    executor =
+        new ThreadPoolExecutor(
+            maximumPoolSize,
+            maximumPoolSize,
+            keepAliveTime,
+            unit,
+            new LinkedBlockingQueue<>(),
+            threadFactory);
+    executor.allowCoreThreadTimeOut(true);
+    this.maximumElementsOutstanding = maximumElementsOutstanding;
+    this.maximumBytesOutstanding = maximumBytesOutstanding;
+  }
+
+  // Before adding a Work to the queue, check that there are enough bytes of space or no other
+  // outstanding elements of work.
+  public void execute(Runnable work, long workBytes) {
+    monitor.enterWhenUninterruptibly(
+        new Guard(monitor) {
+          @Override
+          public boolean isSatisfied() {
+            return elementsOutstanding == 0
+                || (bytesAvailable() >= workBytes
+                    && elementsOutstanding < maximumElementsOutstanding);
+          }
+        });
+    executeLockHeld(work, workBytes);
   }
 
-  // Before adding a Runnable to the queue, acquire the semaphore.
-  @Override
-  public void execute(Runnable r) {
-    semaphore.acquireUninterruptibly();
-    super.execute(r);
+  public void forceExecute(Runnable work, long workBytes) {
+    monitor.enter();
+    executeLockHeld(work, workBytes);
   }
 
   // Forcibly add something to the queue, ignoring the length limit.
   public void forceExecute(Runnable r) {
-    semaphore.reducePermits(1);
-    super.execute(r);
+    forceExecute(r, 0);

Review comment:
       Done.

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
##########
@@ -18,62 +18,135 @@
 package org.apache.beam.runners.dataflow.worker.util;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor.Guard;
 
-/** Executor that blocks on execute() if its queue is full. */
+/** An executor for executing work on windmill items. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BoundedQueueExecutor extends ThreadPoolExecutor {
-  private static class ReducableSemaphore extends Semaphore {
-    ReducableSemaphore(int permits) {
-      super(permits);
-    }
-
-    @Override
-    public void reducePermits(int permits) {
-      super.reducePermits(permits);
-    }
-  }
+public class BoundedQueueExecutor {
+  private final ThreadPoolExecutor executor;
+  private final int maximumElementsOutstanding;
+  private final long maximumBytesOutstanding;
 
-  private ReducableSemaphore semaphore;
+  private final Monitor monitor = new Monitor();
+  private int elementsOutstanding = 0;
+  private long bytesOutstanding = 0;
 
   public BoundedQueueExecutor(
       int maximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
-      int maximumQueueSize,
+      int maximumElementsOutstanding,
+      long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
-    super(
-        maximumPoolSize,
-        maximumPoolSize,
-        keepAliveTime,
-        unit,
-        new LinkedBlockingQueue<Runnable>(),
-        threadFactory);
-    this.semaphore = new ReducableSemaphore(maximumQueueSize);
-    allowCoreThreadTimeOut(true);
+    executor =
+        new ThreadPoolExecutor(
+            maximumPoolSize,
+            maximumPoolSize,
+            keepAliveTime,
+            unit,
+            new LinkedBlockingQueue<>(),
+            threadFactory);
+    executor.allowCoreThreadTimeOut(true);
+    this.maximumElementsOutstanding = maximumElementsOutstanding;
+    this.maximumBytesOutstanding = maximumBytesOutstanding;
+  }
+
+  // Before adding a Work to the queue, check that there are enough bytes of space or no other
+  // outstanding elements of work.
+  public void execute(Runnable work, long workBytes) {
+    monitor.enterWhenUninterruptibly(
+        new Guard(monitor) {
+          @Override
+          public boolean isSatisfied() {
+            return elementsOutstanding == 0
+                || (bytesAvailable() >= workBytes
+                    && elementsOutstanding < maximumElementsOutstanding);
+          }
+        });
+    executeLockHeld(work, workBytes);
   }
 
-  // Before adding a Runnable to the queue, acquire the semaphore.
-  @Override
-  public void execute(Runnable r) {
-    semaphore.acquireUninterruptibly();
-    super.execute(r);
+  public void forceExecute(Runnable work, long workBytes) {
+    monitor.enter();
+    executeLockHeld(work, workBytes);
   }
 
   // Forcibly add something to the queue, ignoring the length limit.
   public void forceExecute(Runnable r) {
-    semaphore.reducePermits(1);
-    super.execute(r);
+    forceExecute(r, 0);
+  }
+
+  public void shutdown() throws InterruptedException {
+    executor.shutdown();
+    if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
+      throw new RuntimeException("Work executor did not terminate within 5 minutes");
+    }
+  }
+
+  public boolean executorQueueIsEmpty() {
+    return executor.getQueue().isEmpty();
+  }
+
+  public String summaryHtml() {
+    monitor.enter();
+    try {
+      StringBuilder builder = new StringBuilder();
+      builder.append("Worker Threads: ");
+      builder.append(executor.getPoolSize());
+      builder.append("/");
+      builder.append(executor.getMaximumPoolSize());
+      builder.append("<br>/n");
+
+      builder.append("Active Threads: ");
+      builder.append(executor.getActiveCount());
+      builder.append("<br>/n");
+
+      builder.append("Work Queue Size: ");
+      builder.append(elementsOutstanding);
+      builder.append("/");
+      builder.append(maximumElementsOutstanding);
+      builder.append("<br>/n");
+
+      builder.append("Work Queue Bytes: ");
+      builder.append(bytesOutstanding);
+      builder.append("/");
+      builder.append(maximumBytesOutstanding);
+      builder.append("<br>/n");
+
+      return builder.toString();
+    } finally {
+      monitor.leave();
+    }
+  }
+
+  private void executeLockHeld(Runnable work, long workBytes) {
+    Runnable workRunnable =
+        () -> {
+          try {
+            work.run();
+          } finally {
+            monitor.enter();
+            --elementsOutstanding;
+            bytesOutstanding -= workBytes;
+            monitor.leave();
+          }
+        };
+    try {
+      bytesOutstanding += workBytes;
+      ++elementsOutstanding;
+      executor.execute(workRunnable);
+    } finally {
+      monitor.leave();

Review comment:
       Done.

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
##########
@@ -18,62 +18,135 @@
 package org.apache.beam.runners.dataflow.worker.util;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor.Guard;
 
-/** Executor that blocks on execute() if its queue is full. */
+/** An executor for executing work on windmill items. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BoundedQueueExecutor extends ThreadPoolExecutor {
-  private static class ReducableSemaphore extends Semaphore {
-    ReducableSemaphore(int permits) {
-      super(permits);
-    }
-
-    @Override
-    public void reducePermits(int permits) {
-      super.reducePermits(permits);
-    }
-  }
+public class BoundedQueueExecutor {
+  private final ThreadPoolExecutor executor;
+  private final int maximumElementsOutstanding;
+  private final long maximumBytesOutstanding;
 
-  private ReducableSemaphore semaphore;
+  private final Monitor monitor = new Monitor();
+  private int elementsOutstanding = 0;
+  private long bytesOutstanding = 0;
 
   public BoundedQueueExecutor(
       int maximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
-      int maximumQueueSize,
+      int maximumElementsOutstanding,
+      long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
-    super(
-        maximumPoolSize,
-        maximumPoolSize,
-        keepAliveTime,
-        unit,
-        new LinkedBlockingQueue<Runnable>(),
-        threadFactory);
-    this.semaphore = new ReducableSemaphore(maximumQueueSize);
-    allowCoreThreadTimeOut(true);
+    executor =
+        new ThreadPoolExecutor(
+            maximumPoolSize,
+            maximumPoolSize,
+            keepAliveTime,
+            unit,
+            new LinkedBlockingQueue<>(),
+            threadFactory);
+    executor.allowCoreThreadTimeOut(true);
+    this.maximumElementsOutstanding = maximumElementsOutstanding;
+    this.maximumBytesOutstanding = maximumBytesOutstanding;
+  }
+
+  // Before adding a Work to the queue, check that there are enough bytes of space or no other
+  // outstanding elements of work.
+  public void execute(Runnable work, long workBytes) {
+    monitor.enterWhenUninterruptibly(
+        new Guard(monitor) {
+          @Override
+          public boolean isSatisfied() {
+            return elementsOutstanding == 0
+                || (bytesAvailable() >= workBytes
+                    && elementsOutstanding < maximumElementsOutstanding);
+          }
+        });
+    executeLockHeld(work, workBytes);
   }
 
-  // Before adding a Runnable to the queue, acquire the semaphore.
-  @Override
-  public void execute(Runnable r) {
-    semaphore.acquireUninterruptibly();
-    super.execute(r);
+  public void forceExecute(Runnable work, long workBytes) {
+    monitor.enter();
+    executeLockHeld(work, workBytes);
   }
 
   // Forcibly add something to the queue, ignoring the length limit.
   public void forceExecute(Runnable r) {
-    semaphore.reducePermits(1);
-    super.execute(r);
+    forceExecute(r, 0);
+  }
+
+  public void shutdown() throws InterruptedException {
+    executor.shutdown();
+    if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
+      throw new RuntimeException("Work executor did not terminate within 5 minutes");
+    }
+  }
+
+  public boolean executorQueueIsEmpty() {
+    return executor.getQueue().isEmpty();
+  }
+
+  public String summaryHtml() {
+    monitor.enter();
+    try {
+      StringBuilder builder = new StringBuilder();
+      builder.append("Worker Threads: ");
+      builder.append(executor.getPoolSize());
+      builder.append("/");
+      builder.append(executor.getMaximumPoolSize());
+      builder.append("<br>/n");
+
+      builder.append("Active Threads: ");
+      builder.append(executor.getActiveCount());
+      builder.append("<br>/n");
+
+      builder.append("Work Queue Size: ");
+      builder.append(elementsOutstanding);
+      builder.append("/");
+      builder.append(maximumElementsOutstanding);
+      builder.append("<br>/n");
+
+      builder.append("Work Queue Bytes: ");
+      builder.append(bytesOutstanding);
+      builder.append("/");
+      builder.append(maximumBytesOutstanding);
+      builder.append("<br>/n");
+
+      return builder.toString();
+    } finally {
+      monitor.leave();
+    }
+  }
+
+  private void executeLockHeld(Runnable work, long workBytes) {
+    Runnable workRunnable =
+        () -> {
+          try {
+            work.run();
+          } finally {
+            monitor.enter();
+            --elementsOutstanding;
+            bytesOutstanding -= workBytes;
+            monitor.leave();
+          }
+        };
+    try {
+      bytesOutstanding += workBytes;
+      ++elementsOutstanding;
+      executor.execute(workRunnable);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r812321924



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
##########
@@ -18,62 +18,126 @@
 package org.apache.beam.runners.dataflow.worker.util;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor.Guard;
 
-/** Executor that blocks on execute() if its queue is full. */
+/** An executor for executing work on windmill items. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BoundedQueueExecutor extends ThreadPoolExecutor {
-  private static class ReducableSemaphore extends Semaphore {
-    ReducableSemaphore(int permits) {
-      super(permits);
-    }
+public class BoundedQueueExecutor {
+  private final ThreadPoolExecutor executor;
+  private final int maximumElementsOutstanding;
+  private final long maximumBytesOutstanding;
 
-    @Override
-    public void reducePermits(int permits) {
-      super.reducePermits(permits);
-    }
-  }
-
-  private ReducableSemaphore semaphore;
+  private final Monitor monitor = new Monitor();
+  private int elementsOutstanding = 0;
+  private long bytesOutstanding = 0;
 
   public BoundedQueueExecutor(
       int maximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
-      int maximumQueueSize,
+      int maximumElementsOutstanding,
+      long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
-    super(
-        maximumPoolSize,
-        maximumPoolSize,
-        keepAliveTime,
-        unit,
-        new LinkedBlockingQueue<Runnable>(),
-        threadFactory);
-    this.semaphore = new ReducableSemaphore(maximumQueueSize);
-    allowCoreThreadTimeOut(true);
+    executor =
+        new ThreadPoolExecutor(
+            maximumPoolSize,
+            maximumPoolSize,
+            keepAliveTime,
+            unit,
+            new LinkedBlockingQueue<>(),
+            threadFactory);
+    executor.allowCoreThreadTimeOut(true);
+    this.maximumElementsOutstanding = maximumElementsOutstanding;
+    this.maximumBytesOutstanding = maximumBytesOutstanding;
   }
 
-  // Before adding a Runnable to the queue, acquire the semaphore.
-  @Override
-  public void execute(Runnable r) {
-    semaphore.acquireUninterruptibly();
-    super.execute(r);
+  // Before adding a Work to the queue, check that there are enough bytes of space or no other
+  // outstanding elements of work.
+  public void execute(Runnable work, long workBytes) {
+    monitor.enterWhenUninterruptibly(
+        new Guard(monitor) {
+          @Override
+          public boolean isSatisfied() {
+            return elementsOutstanding == 0
+                || (bytesAvailable() >= workBytes
+                    && elementsOutstanding < maximumElementsOutstanding);
+          }
+        });
+    Runnable workRunnable =
+        () -> {
+          try {
+            work.run();
+          } finally {
+            monitor.enter();
+            --elementsOutstanding;
+            bytesOutstanding -= workBytes;
+            monitor.leave();
+          }
+        };
+    try {
+      bytesOutstanding += workBytes;
+      ++elementsOutstanding;
+      executor.execute(workRunnable);
+    } finally {
+      monitor.leave();
+    }
   }
 
   // Forcibly add something to the queue, ignoring the length limit.
   public void forceExecute(Runnable r) {
-    semaphore.reducePermits(1);
-    super.execute(r);
+    executor.execute(r);

Review comment:
       You can't increment bytes (most usages of this don't correspond to work items).
   
   In general, I think they shouldn't, and we should try to remove the `forceExecute` method- this should be a queue for just work items from windmill, not reused as a generic executor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] scwhittle commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
scwhittle commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r812308098



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -195,6 +195,8 @@
   // retrieving extra work from Windmill without working on it, leading to better
   // prioritization / utilization.
   static final int MAX_WORK_UNITS_QUEUED = 100;
+  // Maximum bytes of WorkItems being processed in the work queue at a time.
+  static final int MAX_WORK_UNITS_BYTES = 500 << 20; // 500MB

Review comment:
       I'm worried this could throttle existing pipelines that weren't OOMing, or that were OOM'ing and the user has tuned by increasing the worker size.
   
   It seems like it would be safer to make this an option, off by default or scaled based upon the available total memory.

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
##########
@@ -18,62 +18,126 @@
 package org.apache.beam.runners.dataflow.worker.util;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor.Guard;
 
-/** Executor that blocks on execute() if its queue is full. */
+/** An executor for executing work on windmill items. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BoundedQueueExecutor extends ThreadPoolExecutor {
-  private static class ReducableSemaphore extends Semaphore {
-    ReducableSemaphore(int permits) {
-      super(permits);
-    }
+public class BoundedQueueExecutor {
+  private final ThreadPoolExecutor executor;
+  private final int maximumElementsOutstanding;
+  private final long maximumBytesOutstanding;
 
-    @Override
-    public void reducePermits(int permits) {
-      super.reducePermits(permits);
-    }
-  }
-
-  private ReducableSemaphore semaphore;
+  private final Monitor monitor = new Monitor();
+  private int elementsOutstanding = 0;
+  private long bytesOutstanding = 0;
 
   public BoundedQueueExecutor(
       int maximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
-      int maximumQueueSize,
+      int maximumElementsOutstanding,
+      long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
-    super(
-        maximumPoolSize,
-        maximumPoolSize,
-        keepAliveTime,
-        unit,
-        new LinkedBlockingQueue<Runnable>(),
-        threadFactory);
-    this.semaphore = new ReducableSemaphore(maximumQueueSize);
-    allowCoreThreadTimeOut(true);
+    executor =
+        new ThreadPoolExecutor(
+            maximumPoolSize,
+            maximumPoolSize,
+            keepAliveTime,
+            unit,
+            new LinkedBlockingQueue<>(),
+            threadFactory);
+    executor.allowCoreThreadTimeOut(true);
+    this.maximumElementsOutstanding = maximumElementsOutstanding;
+    this.maximumBytesOutstanding = maximumBytesOutstanding;
   }
 
-  // Before adding a Runnable to the queue, acquire the semaphore.
-  @Override
-  public void execute(Runnable r) {
-    semaphore.acquireUninterruptibly();
-    super.execute(r);
+  // Before adding a Work to the queue, check that there are enough bytes of space or no other
+  // outstanding elements of work.
+  public void execute(Runnable work, long workBytes) {
+    monitor.enterWhenUninterruptibly(
+        new Guard(monitor) {
+          @Override
+          public boolean isSatisfied() {
+            return elementsOutstanding == 0
+                || (bytesAvailable() >= workBytes
+                    && elementsOutstanding < maximumElementsOutstanding);
+          }
+        });
+    Runnable workRunnable =
+        () -> {
+          try {
+            work.run();
+          } finally {
+            monitor.enter();
+            --elementsOutstanding;
+            bytesOutstanding -= workBytes;
+            monitor.leave();
+          }
+        };
+    try {
+      bytesOutstanding += workBytes;
+      ++elementsOutstanding;
+      executor.execute(workRunnable);
+    } finally {
+      monitor.leave();
+    }
   }
 
   // Forcibly add something to the queue, ignoring the length limit.
   public void forceExecute(Runnable r) {
-    semaphore.reducePermits(1);
-    super.execute(r);
+    executor.execute(r);

Review comment:
       Should this still increase the elements and bytes so that other unforced executions respect the total?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r812422275



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -195,6 +195,8 @@
   // retrieving extra work from Windmill without working on it, leading to better
   // prioritization / utilization.
   static final int MAX_WORK_UNITS_QUEUED = 100;
+  // Maximum bytes of WorkItems being processed in the work queue at a time.
+  static final int MAX_WORK_UNITS_BYTES = 500 << 20; // 500MB

Review comment:
       Done. This is now configurable- if someone experiences a performance regression (which should be unlikely) they can set the byte limit in the pipeline options to some arbitrarily large number.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r813218568



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -195,6 +195,8 @@
   // retrieving extra work from Windmill without working on it, leading to better
   // prioritization / utilization.
   static final int MAX_WORK_UNITS_QUEUED = 100;
+  // Maximum bytes of WorkItems being processed in the work queue at a time.
+  static final int MAX_WORK_UNITS_BYTES = 500 << 20; // 500MB

Review comment:
       This is now defaulted to 50% of JVM max mem




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] scwhittle commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
scwhittle commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r813193152



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -785,6 +779,18 @@ private int chooseMaximumNumberOfThreads() {
     return MAX_PROCESSING_THREADS;
   }
 
+  private int chooseMaximumBundlesOutstanding() {
+    return Math.max(options.getMaxBundlesFromWindmillOutstanding(), chooseMaximumNumberOfThreads());

Review comment:
       I think the previous was 100 + chooseMaximumNumberOfThreads(), should we keep that?
   The requested was fixed 400 previously but the active+queued previously was 100 queued+num threads active.

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
##########
@@ -18,62 +18,126 @@
 package org.apache.beam.runners.dataflow.worker.util;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor.Guard;
 
-/** Executor that blocks on execute() if its queue is full. */
+/** An executor for executing work on windmill items. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BoundedQueueExecutor extends ThreadPoolExecutor {
-  private static class ReducableSemaphore extends Semaphore {
-    ReducableSemaphore(int permits) {
-      super(permits);
-    }
+public class BoundedQueueExecutor {
+  private final ThreadPoolExecutor executor;
+  private final int maximumElementsOutstanding;
+  private final long maximumBytesOutstanding;
 
-    @Override
-    public void reducePermits(int permits) {
-      super.reducePermits(permits);
-    }
-  }
-
-  private ReducableSemaphore semaphore;
+  private final Monitor monitor = new Monitor();
+  private int elementsOutstanding = 0;
+  private long bytesOutstanding = 0;
 
   public BoundedQueueExecutor(
       int maximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
-      int maximumQueueSize,
+      int maximumElementsOutstanding,
+      long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
-    super(
-        maximumPoolSize,
-        maximumPoolSize,
-        keepAliveTime,
-        unit,
-        new LinkedBlockingQueue<Runnable>(),
-        threadFactory);
-    this.semaphore = new ReducableSemaphore(maximumQueueSize);
-    allowCoreThreadTimeOut(true);
+    executor =
+        new ThreadPoolExecutor(
+            maximumPoolSize,
+            maximumPoolSize,
+            keepAliveTime,
+            unit,
+            new LinkedBlockingQueue<>(),
+            threadFactory);
+    executor.allowCoreThreadTimeOut(true);
+    this.maximumElementsOutstanding = maximumElementsOutstanding;
+    this.maximumBytesOutstanding = maximumBytesOutstanding;
   }
 
-  // Before adding a Runnable to the queue, acquire the semaphore.
-  @Override
-  public void execute(Runnable r) {
-    semaphore.acquireUninterruptibly();
-    super.execute(r);
+  // Before adding a Work to the queue, check that there are enough bytes of space or no other
+  // outstanding elements of work.
+  public void execute(Runnable work, long workBytes) {
+    monitor.enterWhenUninterruptibly(
+        new Guard(monitor) {
+          @Override
+          public boolean isSatisfied() {
+            return elementsOutstanding == 0
+                || (bytesAvailable() >= workBytes
+                    && elementsOutstanding < maximumElementsOutstanding);
+          }
+        });
+    Runnable workRunnable =
+        () -> {
+          try {
+            work.run();
+          } finally {
+            monitor.enter();
+            --elementsOutstanding;
+            bytesOutstanding -= workBytes;
+            monitor.leave();
+          }
+        };
+    try {
+      bytesOutstanding += workBytes;
+      ++elementsOutstanding;
+      executor.execute(workRunnable);
+    } finally {
+      monitor.leave();
+    }
   }
 
   // Forcibly add something to the queue, ignoring the length limit.
   public void forceExecute(Runnable r) {
-    semaphore.reducePermits(1);
-    super.execute(r);
+    executor.execute(r);

Review comment:
       Some of the existing forceExecutes are for retries of work items (user exception for example), in which case it seems like we would want to keep the bytes/elements, otherwise we could get ooms.
   
   The other use appears to be finalization callbacks which I agree doesn't have a clear bytes or element count.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r813234770



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -785,6 +779,18 @@ private int chooseMaximumNumberOfThreads() {
     return MAX_PROCESSING_THREADS;
   }
 
+  private int chooseMaximumBundlesOutstanding() {
+    return Math.max(options.getMaxBundlesFromWindmillOutstanding(), chooseMaximumNumberOfThreads());

Review comment:
       I've fixed this to match the current behavior in the case that the number of threads is modified.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] scwhittle commented on pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
scwhittle commented on pull request #16901:
URL: https://github.com/apache/beam/pull/16901#issuecomment-1050011783


   LGTM, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nbali commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
nbali commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r812369260



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -195,6 +195,8 @@
   // retrieving extra work from Windmill without working on it, leading to better
   // prioritization / utilization.
   static final int MAX_WORK_UNITS_QUEUED = 100;
+  // Maximum bytes of WorkItems being processed in the work queue at a time.
+  static final int MAX_WORK_UNITS_BYTES = 500 << 20; // 500MB

Review comment:
       Following that logic `MAX_WORK_UNITS_QUEUED` should be configurable as well then. Just like how having a memory limit can throttle performance if the work units are big, having a count limit when the work units are small does the same.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #16901:
URL: https://github.com/apache/beam/pull/16901#issuecomment-1050026970


   Run Spotless PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] scwhittle commented on a change in pull request #16901: Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill

Posted by GitBox <gi...@apache.org>.
scwhittle commented on a change in pull request #16901:
URL: https://github.com/apache/beam/pull/16901#discussion_r814013955



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
##########
@@ -18,62 +18,135 @@
 package org.apache.beam.runners.dataflow.worker.util;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor.Guard;
 
-/** Executor that blocks on execute() if its queue is full. */
+/** An executor for executing work on windmill items. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BoundedQueueExecutor extends ThreadPoolExecutor {
-  private static class ReducableSemaphore extends Semaphore {
-    ReducableSemaphore(int permits) {
-      super(permits);
-    }
-
-    @Override
-    public void reducePermits(int permits) {
-      super.reducePermits(permits);
-    }
-  }
+public class BoundedQueueExecutor {
+  private final ThreadPoolExecutor executor;
+  private final int maximumElementsOutstanding;
+  private final long maximumBytesOutstanding;
 
-  private ReducableSemaphore semaphore;
+  private final Monitor monitor = new Monitor();
+  private int elementsOutstanding = 0;
+  private long bytesOutstanding = 0;
 
   public BoundedQueueExecutor(
       int maximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
-      int maximumQueueSize,
+      int maximumElementsOutstanding,
+      long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
-    super(
-        maximumPoolSize,
-        maximumPoolSize,
-        keepAliveTime,
-        unit,
-        new LinkedBlockingQueue<Runnable>(),
-        threadFactory);
-    this.semaphore = new ReducableSemaphore(maximumQueueSize);
-    allowCoreThreadTimeOut(true);
+    executor =
+        new ThreadPoolExecutor(
+            maximumPoolSize,
+            maximumPoolSize,
+            keepAliveTime,
+            unit,
+            new LinkedBlockingQueue<>(),
+            threadFactory);
+    executor.allowCoreThreadTimeOut(true);
+    this.maximumElementsOutstanding = maximumElementsOutstanding;
+    this.maximumBytesOutstanding = maximumBytesOutstanding;
+  }
+
+  // Before adding a Work to the queue, check that there are enough bytes of space or no other
+  // outstanding elements of work.
+  public void execute(Runnable work, long workBytes) {
+    monitor.enterWhenUninterruptibly(
+        new Guard(monitor) {
+          @Override
+          public boolean isSatisfied() {
+            return elementsOutstanding == 0
+                || (bytesAvailable() >= workBytes
+                    && elementsOutstanding < maximumElementsOutstanding);
+          }
+        });
+    executeLockHeld(work, workBytes);
   }
 
-  // Before adding a Runnable to the queue, acquire the semaphore.
-  @Override
-  public void execute(Runnable r) {
-    semaphore.acquireUninterruptibly();
-    super.execute(r);
+  public void forceExecute(Runnable work, long workBytes) {
+    monitor.enter();
+    executeLockHeld(work, workBytes);
   }
 
   // Forcibly add something to the queue, ignoring the length limit.
   public void forceExecute(Runnable r) {
-    semaphore.reducePermits(1);
-    super.execute(r);
+    forceExecute(r, 0);
+  }
+
+  public void shutdown() throws InterruptedException {
+    executor.shutdown();
+    if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
+      throw new RuntimeException("Work executor did not terminate within 5 minutes");
+    }
+  }
+
+  public boolean executorQueueIsEmpty() {
+    return executor.getQueue().isEmpty();
+  }
+
+  public String summaryHtml() {
+    monitor.enter();
+    try {
+      StringBuilder builder = new StringBuilder();
+      builder.append("Worker Threads: ");
+      builder.append(executor.getPoolSize());
+      builder.append("/");
+      builder.append(executor.getMaximumPoolSize());
+      builder.append("<br>/n");
+
+      builder.append("Active Threads: ");
+      builder.append(executor.getActiveCount());
+      builder.append("<br>/n");
+
+      builder.append("Work Queue Size: ");
+      builder.append(elementsOutstanding);
+      builder.append("/");
+      builder.append(maximumElementsOutstanding);
+      builder.append("<br>/n");
+
+      builder.append("Work Queue Bytes: ");
+      builder.append(bytesOutstanding);
+      builder.append("/");
+      builder.append(maximumBytesOutstanding);
+      builder.append("<br>/n");
+
+      return builder.toString();
+    } finally {
+      monitor.leave();
+    }
+  }
+
+  private void executeLockHeld(Runnable work, long workBytes) {
+    Runnable workRunnable =
+        () -> {
+          try {
+            work.run();
+          } finally {
+            monitor.enter();
+            --elementsOutstanding;
+            bytesOutstanding -= workBytes;
+            monitor.leave();
+          }
+        };
+    try {
+      bytesOutstanding += workBytes;
+      ++elementsOutstanding;
+      executor.execute(workRunnable);

Review comment:
       seems like you should reduce monitor scope to just increment counters. Then make runnable and call execute outside
   
   Might deadlock otherwise if forceexecutes have pushed the executor queue over maxelements and execute blocks queueing with monitor held.  Then work won't be able to enter and decrement when done and threads in executor will be tied up.

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
##########
@@ -18,62 +18,135 @@
 package org.apache.beam.runners.dataflow.worker.util;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor.Guard;
 
-/** Executor that blocks on execute() if its queue is full. */
+/** An executor for executing work on windmill items. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BoundedQueueExecutor extends ThreadPoolExecutor {
-  private static class ReducableSemaphore extends Semaphore {
-    ReducableSemaphore(int permits) {
-      super(permits);
-    }
-
-    @Override
-    public void reducePermits(int permits) {
-      super.reducePermits(permits);
-    }
-  }
+public class BoundedQueueExecutor {
+  private final ThreadPoolExecutor executor;
+  private final int maximumElementsOutstanding;
+  private final long maximumBytesOutstanding;
 
-  private ReducableSemaphore semaphore;
+  private final Monitor monitor = new Monitor();
+  private int elementsOutstanding = 0;
+  private long bytesOutstanding = 0;
 
   public BoundedQueueExecutor(
       int maximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
-      int maximumQueueSize,
+      int maximumElementsOutstanding,
+      long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
-    super(
-        maximumPoolSize,
-        maximumPoolSize,
-        keepAliveTime,
-        unit,
-        new LinkedBlockingQueue<Runnable>(),
-        threadFactory);
-    this.semaphore = new ReducableSemaphore(maximumQueueSize);
-    allowCoreThreadTimeOut(true);
+    executor =
+        new ThreadPoolExecutor(
+            maximumPoolSize,
+            maximumPoolSize,
+            keepAliveTime,
+            unit,
+            new LinkedBlockingQueue<>(),
+            threadFactory);
+    executor.allowCoreThreadTimeOut(true);
+    this.maximumElementsOutstanding = maximumElementsOutstanding;
+    this.maximumBytesOutstanding = maximumBytesOutstanding;
+  }
+
+  // Before adding a Work to the queue, check that there are enough bytes of space or no other
+  // outstanding elements of work.
+  public void execute(Runnable work, long workBytes) {
+    monitor.enterWhenUninterruptibly(
+        new Guard(monitor) {
+          @Override
+          public boolean isSatisfied() {
+            return elementsOutstanding == 0
+                || (bytesAvailable() >= workBytes
+                    && elementsOutstanding < maximumElementsOutstanding);
+          }
+        });
+    executeLockHeld(work, workBytes);
   }
 
-  // Before adding a Runnable to the queue, acquire the semaphore.
-  @Override
-  public void execute(Runnable r) {
-    semaphore.acquireUninterruptibly();
-    super.execute(r);
+  public void forceExecute(Runnable work, long workBytes) {

Review comment:
       comment

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
##########
@@ -18,62 +18,135 @@
 package org.apache.beam.runners.dataflow.worker.util;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor.Guard;
 
-/** Executor that blocks on execute() if its queue is full. */
+/** An executor for executing work on windmill items. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BoundedQueueExecutor extends ThreadPoolExecutor {
-  private static class ReducableSemaphore extends Semaphore {
-    ReducableSemaphore(int permits) {
-      super(permits);
-    }
-
-    @Override
-    public void reducePermits(int permits) {
-      super.reducePermits(permits);
-    }
-  }
+public class BoundedQueueExecutor {
+  private final ThreadPoolExecutor executor;
+  private final int maximumElementsOutstanding;
+  private final long maximumBytesOutstanding;
 
-  private ReducableSemaphore semaphore;
+  private final Monitor monitor = new Monitor();
+  private int elementsOutstanding = 0;
+  private long bytesOutstanding = 0;
 
   public BoundedQueueExecutor(
       int maximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
-      int maximumQueueSize,
+      int maximumElementsOutstanding,
+      long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
-    super(
-        maximumPoolSize,
-        maximumPoolSize,
-        keepAliveTime,
-        unit,
-        new LinkedBlockingQueue<Runnable>(),
-        threadFactory);
-    this.semaphore = new ReducableSemaphore(maximumQueueSize);
-    allowCoreThreadTimeOut(true);
+    executor =
+        new ThreadPoolExecutor(
+            maximumPoolSize,
+            maximumPoolSize,
+            keepAliveTime,
+            unit,
+            new LinkedBlockingQueue<>(),
+            threadFactory);
+    executor.allowCoreThreadTimeOut(true);
+    this.maximumElementsOutstanding = maximumElementsOutstanding;
+    this.maximumBytesOutstanding = maximumBytesOutstanding;
+  }
+
+  // Before adding a Work to the queue, check that there are enough bytes of space or no other
+  // outstanding elements of work.
+  public void execute(Runnable work, long workBytes) {
+    monitor.enterWhenUninterruptibly(
+        new Guard(monitor) {
+          @Override
+          public boolean isSatisfied() {
+            return elementsOutstanding == 0
+                || (bytesAvailable() >= workBytes
+                    && elementsOutstanding < maximumElementsOutstanding);
+          }
+        });
+    executeLockHeld(work, workBytes);
   }
 
-  // Before adding a Runnable to the queue, acquire the semaphore.
-  @Override
-  public void execute(Runnable r) {
-    semaphore.acquireUninterruptibly();
-    super.execute(r);
+  public void forceExecute(Runnable work, long workBytes) {
+    monitor.enter();
+    executeLockHeld(work, workBytes);
   }
 
   // Forcibly add something to the queue, ignoring the length limit.
   public void forceExecute(Runnable r) {
-    semaphore.reducePermits(1);
-    super.execute(r);
+    forceExecute(r, 0);
+  }
+
+  public void shutdown() throws InterruptedException {
+    executor.shutdown();
+    if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
+      throw new RuntimeException("Work executor did not terminate within 5 minutes");
+    }
+  }
+
+  public boolean executorQueueIsEmpty() {
+    return executor.getQueue().isEmpty();
+  }
+
+  public String summaryHtml() {
+    monitor.enter();
+    try {
+      StringBuilder builder = new StringBuilder();
+      builder.append("Worker Threads: ");
+      builder.append(executor.getPoolSize());
+      builder.append("/");
+      builder.append(executor.getMaximumPoolSize());
+      builder.append("<br>/n");
+
+      builder.append("Active Threads: ");
+      builder.append(executor.getActiveCount());
+      builder.append("<br>/n");
+
+      builder.append("Work Queue Size: ");
+      builder.append(elementsOutstanding);
+      builder.append("/");
+      builder.append(maximumElementsOutstanding);
+      builder.append("<br>/n");
+
+      builder.append("Work Queue Bytes: ");
+      builder.append(bytesOutstanding);
+      builder.append("/");
+      builder.append(maximumBytesOutstanding);
+      builder.append("<br>/n");
+
+      return builder.toString();
+    } finally {
+      monitor.leave();
+    }
+  }
+
+  private void executeLockHeld(Runnable work, long workBytes) {
+    Runnable workRunnable =
+        () -> {
+          try {
+            work.run();
+          } finally {
+            monitor.enter();
+            --elementsOutstanding;
+            bytesOutstanding -= workBytes;
+            monitor.leave();
+          }
+        };
+    try {
+      bytesOutstanding += workBytes;
+      ++elementsOutstanding;
+      executor.execute(workRunnable);
+    } finally {
+      monitor.leave();

Review comment:
       can adding to executor throw exeception? If so you should decrement counters before throwing exception in case it gets caught and handled above.

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
##########
@@ -18,62 +18,135 @@
 package org.apache.beam.runners.dataflow.worker.util;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Monitor.Guard;
 
-/** Executor that blocks on execute() if its queue is full. */
+/** An executor for executing work on windmill items. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BoundedQueueExecutor extends ThreadPoolExecutor {
-  private static class ReducableSemaphore extends Semaphore {
-    ReducableSemaphore(int permits) {
-      super(permits);
-    }
-
-    @Override
-    public void reducePermits(int permits) {
-      super.reducePermits(permits);
-    }
-  }
+public class BoundedQueueExecutor {
+  private final ThreadPoolExecutor executor;
+  private final int maximumElementsOutstanding;
+  private final long maximumBytesOutstanding;
 
-  private ReducableSemaphore semaphore;
+  private final Monitor monitor = new Monitor();
+  private int elementsOutstanding = 0;
+  private long bytesOutstanding = 0;
 
   public BoundedQueueExecutor(
       int maximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
-      int maximumQueueSize,
+      int maximumElementsOutstanding,
+      long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
-    super(
-        maximumPoolSize,
-        maximumPoolSize,
-        keepAliveTime,
-        unit,
-        new LinkedBlockingQueue<Runnable>(),
-        threadFactory);
-    this.semaphore = new ReducableSemaphore(maximumQueueSize);
-    allowCoreThreadTimeOut(true);
+    executor =
+        new ThreadPoolExecutor(
+            maximumPoolSize,
+            maximumPoolSize,
+            keepAliveTime,
+            unit,
+            new LinkedBlockingQueue<>(),
+            threadFactory);
+    executor.allowCoreThreadTimeOut(true);
+    this.maximumElementsOutstanding = maximumElementsOutstanding;
+    this.maximumBytesOutstanding = maximumBytesOutstanding;
+  }
+
+  // Before adding a Work to the queue, check that there are enough bytes of space or no other
+  // outstanding elements of work.
+  public void execute(Runnable work, long workBytes) {
+    monitor.enterWhenUninterruptibly(
+        new Guard(monitor) {
+          @Override
+          public boolean isSatisfied() {
+            return elementsOutstanding == 0
+                || (bytesAvailable() >= workBytes
+                    && elementsOutstanding < maximumElementsOutstanding);
+          }
+        });
+    executeLockHeld(work, workBytes);
   }
 
-  // Before adding a Runnable to the queue, acquire the semaphore.
-  @Override
-  public void execute(Runnable r) {
-    semaphore.acquireUninterruptibly();
-    super.execute(r);
+  public void forceExecute(Runnable work, long workBytes) {
+    monitor.enter();
+    executeLockHeld(work, workBytes);
   }
 
   // Forcibly add something to the queue, ignoring the length limit.
   public void forceExecute(Runnable r) {
-    semaphore.reducePermits(1);
-    super.execute(r);
+    forceExecute(r, 0);

Review comment:
       nit: since this is kind of gross, I'd remove it from this class and make the caller pass zero.  Then callers at realize they should pass real bytes or use another executor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org