You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/30 00:45:52 UTC

[GitHub] [beam] lukecwik opened a new pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

lukecwik opened a new pull request #15849:
URL: https://github.com/apache/beam/pull/15849


   Also update one location simplifying the queue interaction semantics to have the StreamObserver only interacted from the queue draining thread instead of relying on synchronization.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-956445096


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r742090691



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
  * becomes ready.
  */
 @ThreadSafe
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object> implements StreamObserver<T> {

Review comment:
       Yes.
   
   The gRPC StreamObservers are not threadsafe across multiple threads so either you need to have threads synchronize or use a queue and have a dedicated thread writing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-961262291


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-955117144


   R: @youngoli @ihji 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-959994608


   Actually, I was misremembering. The issue is that the onError below passes in a Throwable instead of an Exception and I wanted to keep CancellableQueue limited to Exceptions and not throwables so I was wrapping the throwable into an exception and passing that through.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r742089863



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
           if (value != POISON_PILL) {
             outboundObserver.onNext(value);
           } else {
+            outboundObserver.onCompleted();
             return;
           }
         }
         phaser.awaitAdvance(currentPhase);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
+    } catch (OnErrorException e) {
+      outboundObserver.onError(e.getCause());

Review comment:
       The main reason is to pass through the exception from the caller so that I know that this is the case where we want to pull out the cause and not pass through the top level exception.
   
   Avoiding another cancel doesn't really do much as you already mentioned.

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
  * becomes ready.
  */
 @ThreadSafe
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object> implements StreamObserver<T> {

Review comment:
       Yes.
   
   The gRPC StreamObservers are typically not threadsafe across multiple threads so either you need to have threads synchronize or use a queue and have a dedicated thread writing.

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
  * becomes ready.
  */
 @ThreadSafe
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object> implements StreamObserver<T> {

Review comment:
       Yes.
   
   The gRPC StreamObservers are not threadsafe across multiple threads so either you need to have threads synchronize or use a queue and have a dedicated thread writing.

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
           if (value != POISON_PILL) {
             outboundObserver.onNext(value);
           } else {
+            outboundObserver.onCompleted();
             return;
           }
         }
         phaser.awaitAdvance(currentPhase);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
+    } catch (OnErrorException e) {
+      outboundObserver.onError(e.getCause());

Review comment:
       The main reason is to pass through the exception from the caller so that I know that this is the case where we want to pull out the cause and not pass through the top level exception.
   
   Avoiding another cancel doesn't really do much as you already mentioned.

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
  * becomes ready.
  */
 @ThreadSafe
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object> implements StreamObserver<T> {

Review comment:
       Yes.
   
   The gRPC StreamObservers are typically not threadsafe across multiple threads so either you need to have threads synchronize or use a queue and have a dedicated thread writing.

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
  * becomes ready.
  */
 @ThreadSafe
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object> implements StreamObserver<T> {

Review comment:
       Yes.
   
   The gRPC StreamObservers are not threadsafe across multiple threads so either you need to have threads synchronize or use a queue and have a dedicated thread writing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r741625279



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
           if (value != POISON_PILL) {
             outboundObserver.onNext(value);
           } else {
+            outboundObserver.onCompleted();
             return;
           }
         }
         phaser.awaitAdvance(currentPhase);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
+    } catch (OnErrorException e) {
+      outboundObserver.onError(e.getCause());

Review comment:
       As far as I can tell, this spot in the code is the only reason to have `OnErrorException`, and the only difference seems to be to avoid a redundant `queue.cancel` (it gets cancelled in `onError` and then again here). Is calling `queue.cancel` twice enough of a problem to warrant this?

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
  * becomes ready.
  */
 @ThreadSafe
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object> implements StreamObserver<T> {

Review comment:
       Just to make sure I'm understanding this class correctly, it is essentially working on two separate threads, right? One thread is the calls to the public API (i.e. the methods of `StreamObserver`, so `onNext`, `onCompleted`, and `onError`), while the other thread is the executor calling `drainQueue` which is the thread that actually does all the interaction with the underlying `outboundObserver`.
   
   Am I understanding this class correctly?

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
           if (value != POISON_PILL) {
             outboundObserver.onNext(value);
           } else {
+            outboundObserver.onCompleted();
             return;
           }
         }
         phaser.awaitAdvance(currentPhase);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
+    } catch (OnErrorException e) {
+      outboundObserver.onError(e.getCause());

Review comment:
       Ack. I was just thinking that the general Exception catch already calls `outboundObserver.onError(e)`, so if you cancel  the queue with the top-level exception it would be the same. Or am I misunderstanding what happens with `getCause`? I'm reading it as getting the original exception that was nested in the `OnErrorException`, but maybe it's a more top level exception than that?
   
   Regardless, this is all nit-picking at this point. Not worth blocking approval on. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r741625279



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
           if (value != POISON_PILL) {
             outboundObserver.onNext(value);
           } else {
+            outboundObserver.onCompleted();
             return;
           }
         }
         phaser.awaitAdvance(currentPhase);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
+    } catch (OnErrorException e) {
+      outboundObserver.onError(e.getCause());

Review comment:
       As far as I can tell, this spot in the code is the only reason to have `OnErrorException`, and the only difference seems to be to avoid a redundant `queue.cancel` (it gets cancelled in `onError` and then again here). Is calling `queue.cancel` twice enough of a problem to warrant this?

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
  * becomes ready.
  */
 @ThreadSafe
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object> implements StreamObserver<T> {

Review comment:
       Just to make sure I'm understanding this class correctly, it is essentially working on two separate threads, right? One thread is the calls to the public API (i.e. the methods of `StreamObserver`, so `onNext`, `onCompleted`, and `onError`), while the other thread is the executor calling `drainQueue` which is the thread that actually does all the interaction with the underlying `outboundObserver`.
   
   Am I understanding this class correctly?

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
           if (value != POISON_PILL) {
             outboundObserver.onNext(value);
           } else {
+            outboundObserver.onCompleted();
             return;
           }
         }
         phaser.awaitAdvance(currentPhase);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
+    } catch (OnErrorException e) {
+      outboundObserver.onError(e.getCause());

Review comment:
       Ack. I was just thinking that the general Exception catch already calls `outboundObserver.onError(e)`, so if you cancel  the queue with the top-level exception it would be the same. Or am I misunderstanding what happens with `getCause`? I'm reading it as getting the original exception that was nested in the `OnErrorException`, but maybe it's a more top level exception than that?
   
   Regardless, this is all nit-picking at this point. Not worth blocking approval on. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-956433965






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik removed a comment on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-961190690


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r742090691



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
  * becomes ready.
  */
 @ThreadSafe
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object> implements StreamObserver<T> {

Review comment:
       Yes.
   
   The gRPC StreamObservers are typically not threadsafe across multiple threads so either you need to have threads synchronize or use a queue and have a dedicated thread writing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-959549983


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-960254457


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r741625279



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
           if (value != POISON_PILL) {
             outboundObserver.onNext(value);
           } else {
+            outboundObserver.onCompleted();
             return;
           }
         }
         phaser.awaitAdvance(currentPhase);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
+    } catch (OnErrorException e) {
+      outboundObserver.onError(e.getCause());

Review comment:
       As far as I can tell, this spot in the code is the only reason to have `OnErrorException`, and the only difference seems to be to avoid a redundant `queue.cancel` (it gets cancelled in `onError` and then again here). Is calling `queue.cancel` twice enough of a problem to warrant this?

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
  * becomes ready.
  */
 @ThreadSafe
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object> implements StreamObserver<T> {

Review comment:
       Just to make sure I'm understanding this class correctly, it is essentially working on two separate threads, right? One thread is the calls to the public API (i.e. the methods of `StreamObserver`, so `onNext`, `onCompleted`, and `onError`), while the other thread is the executor calling `drainQueue` which is the thread that actually does all the interaction with the underlying `outboundObserver`.
   
   Am I understanding this class correctly?

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
           if (value != POISON_PILL) {
             outboundObserver.onNext(value);
           } else {
+            outboundObserver.onCompleted();
             return;
           }
         }
         phaser.awaitAdvance(currentPhase);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
+    } catch (OnErrorException e) {
+      outboundObserver.onError(e.getCause());

Review comment:
       Ack. I was just thinking that the general Exception catch already calls `outboundObserver.onError(e)`, so if you cancel  the queue with the top-level exception it would be the same. Or am I misunderstanding what happens with `getCause`? I'm reading it as getting the original exception that was nested in the `OnErrorException`, but maybe it's a more top level exception than that?
   
   Regardless, this is all nit-picking at this point. Not worth blocking approval on. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik removed a comment on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-961190690


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r742306537



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
           if (value != POISON_PILL) {
             outboundObserver.onNext(value);
           } else {
+            outboundObserver.onCompleted();
             return;
           }
         }
         phaser.awaitAdvance(currentPhase);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
+    } catch (OnErrorException e) {
+      outboundObserver.onError(e.getCause());

Review comment:
       Ack. I was just thinking that the general Exception catch already calls `outboundObserver.onError(e)`, so if you cancel  the queue with the top-level exception it would be the same. Or am I misunderstanding what happens with `getCause`? I'm reading it as getting the original exception that was nested in the `OnErrorException`, but maybe it's a more top level exception than that?
   
   Regardless, this is all nit-picking at this point. Not worth blocking approval on. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #15849:
URL: https://github.com/apache/beam/pull/15849


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #15849:
URL: https://github.com/apache/beam/pull/15849


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik removed a comment on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-960254457


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-961190690


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-959996548


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-961262291


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r742089863



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
           if (value != POISON_PILL) {
             outboundObserver.onNext(value);
           } else {
+            outboundObserver.onCompleted();
             return;
           }
         }
         phaser.awaitAdvance(currentPhase);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
+    } catch (OnErrorException e) {
+      outboundObserver.onError(e.getCause());

Review comment:
       The main reason is to pass through the exception from the caller so that I know that this is the case where we want to pull out the cause and not pass through the top level exception.
   
   Avoiding another cancel doesn't really do much as you already mentioned.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r742089863



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
           if (value != POISON_PILL) {
             outboundObserver.onNext(value);
           } else {
+            outboundObserver.onCompleted();
             return;
           }
         }
         phaser.awaitAdvance(currentPhase);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
+    } catch (OnErrorException e) {
+      outboundObserver.onError(e.getCause());

Review comment:
       The main reason is to pass through the exception from the caller so that I know that this is the case where we want to pull out the cause and not pass through the top level exception.
   
   Avoiding another cancel doesn't really do much as you already mentioned.

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
  * becomes ready.
  */
 @ThreadSafe
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object> implements StreamObserver<T> {

Review comment:
       Yes.
   
   The gRPC StreamObservers are typically not threadsafe across multiple threads so either you need to have threads synchronize or use a queue and have a dedicated thread writing.

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
  * becomes ready.
  */
 @ThreadSafe
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object> implements StreamObserver<T> {

Review comment:
       Yes.
   
   The gRPC StreamObservers are not threadsafe across multiple threads so either you need to have threads synchronize or use a queue and have a dedicated thread writing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #15849:
URL: https://github.com/apache/beam/pull/15849


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-958182272


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #15849:
URL: https://github.com/apache/beam/pull/15849#discussion_r741625279



##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -73,94 +83,58 @@ private void drainQueue() {
           if (value != POISON_PILL) {
             outboundObserver.onNext(value);
           } else {
+            outboundObserver.onCompleted();
             return;
           }
         }
         phaser.awaitAdvance(currentPhase);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
+    } catch (OnErrorException e) {
+      outboundObserver.onError(e.getCause());

Review comment:
       As far as I can tell, this spot in the code is the only reason to have `OnErrorException`, and the only difference seems to be to avoid a redundant `queue.cancel` (it gets cancelled in `onError` and then again here). Is calling `queue.cancel` twice enough of a problem to warrant this?

##########
File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
##########
@@ -41,25 +37,39 @@
  * becomes ready.
  */
 @ThreadSafe
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+public final class BufferingStreamObserver<T extends @NonNull Object> implements StreamObserver<T> {

Review comment:
       Just to make sure I'm understanding this class correctly, it is essentially working on two separate threads, right? One thread is the calls to the public API (i.e. the methods of `StreamObserver`, so `onNext`, `onCompleted`, and `onError`), while the other thread is the executor calling `drainQueue` which is the thread that actually does all the interaction with the underlying `outboundObserver`.
   
   Am I understanding this class correctly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-959549983






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-956433965


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15849: [BEAM-3811] Code clean-up of the CancelleableQueue to not throw InterruptedException on cancel/reset since we are never stuck in a loop here or waiting on a signal.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15849:
URL: https://github.com/apache/beam/pull/15849#issuecomment-959549983






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org