You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/18 18:57:40 UTC

[GitHub] [beam] jrmccluskey opened a new pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

jrmccluskey opened a new pull request #15743:
URL: https://github.com/apache/beam/pull/15743


   Implements the default behavior for the SDK harness w.r.t. mapping main input windows into side input window space for requests. Creates an interface and unit tested back-end type to handle mapping for the SideInputAdapter and puts together encoding/decoding for piping the target windowFn through. The same mechanisms could eventually be used to support custom WindowMappingFns from users. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on pull request #15743:
URL: https://github.com/apache/beam/pull/15743#issuecomment-947064122


   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on pull request #15743:
URL: https://github.com/apache/beam/pull/15743#issuecomment-946112732


   Run Go Samza ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15743:
URL: https://github.com/apache/beam/pull/15743#issuecomment-948886943


   Ah very good find. Thank you.
   
   On Thu, Oct 21, 2021, 11:10 AM Lukasz Cwik ***@***.***> wrote:
   
   > It looks like the addition of TestValidateWindowedSideInputs bumped us
   > above the 1h timeout such that the Go PostCommit will timeout more often
   > (for example https://ci-beam.apache.org/job/beam_PostCommit_Go/9019/)
   >
   > It used to be that the run would take ~55mins.
   >
   > Checking the associated job (
   > https://console.cloud.google.com/dataflow/jobs/us-central1/2021-10-21_06_01_28-15917691559290441824?project=apache-beam-testing)
   > and it passed after about 8 mins so ~55 + ~8 > 1h.
   >
   > Filed https://issues.apache.org/jira/browse/BEAM-13096
   >
   > —
   > You are receiving this because you modified the open/close state.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/beam/pull/15743#issuecomment-948878641>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/ADKDOFLMB26SFBEWO46OB53UIBJP5ANCNFSM5GHK3EMQ>
   > .
   > Triage notifications on the go with GitHub Mobile for iOS
   > <https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
   > or Android
   > <https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
   >
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15743:
URL: https://github.com/apache/beam/pull/15743#discussion_r732218275



##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -93,6 +94,41 @@ func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
 
+// ValidateWindowedSideInputs checks that side inputs have accurate windowing information when used.
+func ValidateWindowedSideInputs(s beam.Scope) {
+	timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
+
+	timestampedData = beam.DropKey(s, timestampedData)
+
+	windowSize := 1 * time.Second
+
+	validateSums := func(s beam.Scope, wfn, sideFn *window.Fn, in, side beam.PCollection, expected ...interface{}) {
+		wData := beam.WindowInto(s, wfn, in)
+		wSide := beam.WindowInto(s, sideFn, side)
+
+		sums := beam.ParDo(s, sumSideInputs, wData, beam.SideInput{Input: wSide})
+
+		sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+
+		passert.Equals(s, sums, expected...)
+	}
+
+	validateSums(s.Scope("Fixed-Global"), window.NewFixedWindows(windowSize), window.NewGlobalWindows(), timestampedData, timestampedData, 7, 8, 9)
+	validateSums(s.Scope("Fixed-Same"), window.NewFixedWindows(windowSize), window.NewFixedWindows(windowSize), timestampedData, timestampedData, 2, 4, 6)
+	validateSums(s.Scope("Fixed-Big"), window.NewFixedWindows(windowSize), window.NewFixedWindows(10*time.Second), timestampedData, timestampedData, 7, 8, 9)
+	validateSums(s.Scope("Fixed-Sliding"), window.NewFixedWindows(windowSize), window.NewSlidingWindows(windowSize, 2*windowSize), timestampedData, timestampedData, 7, 4, 6)
+	validateSums(s.Scope("Sliding-Fixed"), window.NewSlidingWindows(windowSize, 2*windowSize), window.NewFixedWindows(windowSize), timestampedData, timestampedData, 2, 3, 4, 5, 6, 3)

Review comment:
       Updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15743:
URL: https://github.com/apache/beam/pull/15743#discussion_r732807187



##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -55,7 +55,7 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
 		sideNode := s.Input.n
 		sideWfn := sideNode.WindowingStrategy().Fn
 		if sideWfn.Kind == window.Sessions {
-			return nil, fmt.Errorf("error with side input %d in DoFn %v: PCollections using merging WindowFns are not supported as side inputs. Consider re-windowing the side input PCollection before use", sideNode, fn)
+			return nil, fmt.Errorf("error with side input %v in DoFn %v: PCollections using merging WindowFns are not supported as side inputs. Consider re-windowing the side input PCollection before use", sideNode, fn)
 		}
 		if (inWfn.Kind == window.GlobalWindows) && (sideWfn.Kind != window.GlobalWindows) {
 			return nil, fmt.Errorf("main input %v is global windowed in DoFn %v but side input %v is not, cannot map windows correctly. Consider re-windowing the side input PCOllection before use", col.n, fn, sideNode)

Review comment:
       Ah I misunderstood that one. Replaced node printout with index




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15743:
URL: https://github.com/apache/beam/pull/15743#issuecomment-948878641


   It looks like the addition of TestValidateWindowedSideInputs bumped us above the 1h timeout such that the Go PostCommit will timeout more often (for example https://ci-beam.apache.org/job/beam_PostCommit_Go/9019/)
   
   It used to be that the run would take ~55mins.
   
   Checking the associated job (https://console.cloud.google.com/dataflow/jobs/us-central1/2021-10-21_06_01_28-15917691559290441824?project=apache-beam-testing) and it passed after about 8 mins so `~55 + ~8 > 1h`.
   
   Filed https://issues.apache.org/jira/browse/BEAM-13096


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15743:
URL: https://github.com/apache/beam/pull/15743#discussion_r731335377



##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -49,7 +50,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
 	}
 
 	in := []*graph.Node{col.n}
+	inWfn := col.n.WindowingStrategy().Fn
 	for _, s := range side {
+		sideNode := s.Input.n
+		sideWfn := sideNode.WindowingStrategy().Fn
+		if sideWfn.Kind == window.Sessions {
+			return nil, fmt.Errorf("side input %v is session-windowed, which is not supported", sideNode.String())

Review comment:
       fmt is aware of the Stringer interface, so you don't need to explicitly call a `String()` method as input into a fmt call.
   ```suggestion
   			return nil, fmt.Errorf("side input %v is session-windowed, which is not supported", sideNode)
   ```

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -49,7 +50,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
 	}
 
 	in := []*graph.Node{col.n}
+	inWfn := col.n.WindowingStrategy().Fn
 	for _, s := range side {
+		sideNode := s.Input.n
+		sideWfn := sideNode.WindowingStrategy().Fn
+		if sideWfn.Kind == window.Sessions {
+			return nil, fmt.Errorf("side input %v is session-windowed, which is not supported", sideNode.String())

Review comment:
       The property that prevents session windows from working properly is that it's a merging windowfn. We can't currently check this bit directly right now, but we can begin to educate users about it.
   
   "error with SideInput %d: PCollections using Merging WindowFns are not supported as side inputs"
   
   Or something similar. So that adds that it's the `i`th side input parameter that was the problem, but we can do better.
   
   We should also add further context: 
   Which user DoFn was the user adding to the graph, that has the bad side input? 
   
   We can also suggest that users could re-window PCollection to a non-merging WindowFn before using it as a side input. As a rule, users love it when the error that impedes them suggests a solution to the problem.

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -49,7 +50,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
 	}
 
 	in := []*graph.Node{col.n}
+	inWfn := col.n.WindowingStrategy().Fn
 	for _, s := range side {
+		sideNode := s.Input.n
+		sideWfn := sideNode.WindowingStrategy().Fn
+		if sideWfn.Kind == window.Sessions {
+			return nil, fmt.Errorf("side input %v is session-windowed, which is not supported", sideNode.String())
+		}
+		if (inWfn.Kind == window.GlobalWindows) && (sideWfn.Kind != window.GlobalWindows) {
+			return nil, fmt.Errorf("main input %v is global windowed but side input %v is not, cannot map windows correctly", col.n.String(), sideNode.String())

Review comment:
       ```suggestion
   			return nil, fmt.Errorf("main input %v is global windowed but side input %v is not, cannot map windows correctly", col.n, sideNode)
   ```

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -93,6 +94,47 @@ func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
 
+// ValidateWindowedSideInputs checks that side inputs have accurate windowing information when used.
+func ValidateWindowedSideInputs(s beam.Scope) {
+	timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
+	timestampedSide := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
+
+	timestampedData = beam.DropKey(s, timestampedData)
+	timestampedSide = beam.DropKey(s, timestampedSide)
+
+	_ = timestampedSide
+
+	windowSize := 1 * time.Second
+
+	validateSums := func(s beam.Scope, wfn, sideFn *window.Fn, in, side beam.PCollection, expected ...interface{}) {
+		wData := beam.WindowInto(s, wfn, in)
+		wSide := beam.WindowInto(s, sideFn, side)
+
+		sums := beam.ParDo(s, sumSideInputs, wData, beam.SideInput{Input: wSide})
+
+		sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+
+		passert.Equals(s, sums, expected...)
+	}
+
+	// This works.

Review comment:
       We likely should get rid of the commentary here, since everything now works. ;)

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -93,6 +94,47 @@ func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
 
+// ValidateWindowedSideInputs checks that side inputs have accurate windowing information when used.
+func ValidateWindowedSideInputs(s beam.Scope) {
+	timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
+	timestampedSide := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
+
+	timestampedData = beam.DropKey(s, timestampedData)
+	timestampedSide = beam.DropKey(s, timestampedSide)
+
+	_ = timestampedSide
+
+	windowSize := 1 * time.Second
+
+	validateSums := func(s beam.Scope, wfn, sideFn *window.Fn, in, side beam.PCollection, expected ...interface{}) {
+		wData := beam.WindowInto(s, wfn, in)
+		wSide := beam.WindowInto(s, sideFn, side)
+
+		sums := beam.ParDo(s, sumSideInputs, wData, beam.SideInput{Input: wSide})
+
+		sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+
+		passert.Equals(s, sums, expected...)
+	}
+
+	// This works.
+	validateSums(s.Scope("Fixed-Global"), window.NewFixedWindows(windowSize), window.NewGlobalWindows(), timestampedData, timestampedData, 7, 8, 9)
+	// So does this.
+	validateSums(s.Scope("Fixed-Same"), window.NewFixedWindows(windowSize), window.NewFixedWindows(windowSize), timestampedData, timestampedData, 2, 4, 6)
+
+	// Thise doesn't
+	validateSums(s.Scope("Fixed-Big"), window.NewFixedWindows(windowSize), window.NewFixedWindows(10*time.Second), timestampedData, timestampedSide, 7, 8, 9)

Review comment:
       Please also add Sliding-Fixed and a Fixed-Sliding case, so we can be sure the code gets exercised fully.

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -49,7 +50,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
 	}
 
 	in := []*graph.Node{col.n}
+	inWfn := col.n.WindowingStrategy().Fn
 	for _, s := range side {
+		sideNode := s.Input.n
+		sideWfn := sideNode.WindowingStrategy().Fn
+		if sideWfn.Kind == window.Sessions {
+			return nil, fmt.Errorf("side input %v is session-windowed, which is not supported", sideNode.String())
+		}
+		if (inWfn.Kind == window.GlobalWindows) && (sideWfn.Kind != window.GlobalWindows) {
+			return nil, fmt.Errorf("main input %v is global windowed but side input %v is not, cannot map windows correctly", col.n.String(), sideNode.String())

Review comment:
       Same improvements to the error message here too: DoFn, which SideInput Parameter had the problem.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on pull request #15743:
URL: https://github.com/apache/beam/pull/15743#issuecomment-947055123


   Run Go PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15743:
URL: https://github.com/apache/beam/pull/15743#discussion_r732273445



##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -55,7 +55,7 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
 		sideNode := s.Input.n
 		sideWfn := sideNode.WindowingStrategy().Fn
 		if sideWfn.Kind == window.Sessions {
-			return nil, fmt.Errorf("error with side input %d in DoFn %v: PCollections using merging WindowFns are not supported as side inputs. Consider re-windowing the side input PCollection before use", sideNode, fn)
+			return nil, fmt.Errorf("error with side input %v in DoFn %v: PCollections using merging WindowFns are not supported as side inputs. Consider re-windowing the side input PCollection before use", sideNode, fn)
 		}
 		if (inWfn.Kind == window.GlobalWindows) && (sideWfn.Kind != window.GlobalWindows) {
 			return nil, fmt.Errorf("main input %v is global windowed in DoFn %v but side input %v is not, cannot map windows correctly. Consider re-windowing the side input PCOllection before use", col.n, fn, sideNode)

Review comment:
       So the sideNode says what Type and such is being used, but doesn't say "this is the 2nd side input into the DoFn", which can be retrieve by changing the for loop to give you the index ala: `for i, s := range side {`
   
   The type information is useful, but most of the rest of the node printout information isn't useful for users to find out which side input is the problem. The types aren't very visible to users at construction time, but "this is the 3rd side input" would be.   Hence the earlier phrasing of  "side input %d"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15743:
URL: https://github.com/apache/beam/pull/15743#discussion_r732027485



##########
File path: sdks/go/pkg/beam/core/runtime/exec/window.go
##########
@@ -96,3 +96,23 @@ func (w *WindowInto) Down(ctx context.Context) error {
 func (w *WindowInto) String() string {
 	return fmt.Sprintf("WindowInto[%v]. Out:%v", w.Fn, w.Out.ID())
 }
+
+// WindowMapper defines an interface maps windows from a main input window space
+// to windows from a side input window space. Used during side input materialization.
+type WindowMapper interface {
+	MapWindow(w typex.Window) (typex.Window, error)
+}
+
+type windowMapper struct {
+	wfn *window.Fn
+}
+
+func (f *windowMapper) MapWindow(w typex.Window) (typex.Window, error) {
+	candidates := assignWindows(f.wfn, w.MaxTimestamp())
+	if len(candidates) == 0 {
+		return nil, fmt.Errorf("failed to map main input window to side input window with WindowFn %v", f.wfn.String())
+	}
+	// Return latest candidate window in terms of event time (only relevant for sliding windows)
+	// Sliding windows append the latest window first in assignWindows.
+	return candidates[0], nil

Review comment:
       This is returning the 1st candidate. Is this correct? Shouldn't it be the last candidate `candidates[len(candidates-1)]` ?
   
   Python uses the last candidate....
   https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/sideinputs.py#L65
   
   And generates them like so, https://github.com/apache/beam/blob/aa4edda39ceb8d7a80f56bd37caa6233dba7de5d/sdks/python/apache_beam/transforms/window.py#L494 
   
   which matches how we assign them in Go: https://github.com/apache/beam/blob/aa4edda39ceb8d7a80f56bd37caa6233dba7de5d/sdks/go/pkg/beam/core/runtime/exec/window.go#L72
   
   Java also does the same thing:
   https://github.com/apache/beam/blob/aa4edda39ceb8d7a80f56bd37caa6233dba7de5d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java#L111
   
   But statically constructs the window. https://github.com/apache/beam/blob/aa4edda39ceb8d7a80f56bd37caa6233dba7de5d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java#L133
   
   calling it the "earliest window" instead of the "latest" window.
   
   The bit that tipped me off to the inconsistency is that in the unit test you have, the side input window ends very much later than the fixed window, which doesn't make sense processing wise: Why wait for additional later data and delay main input processing until the watermark passes that later time?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/window_test.go
##########
@@ -113,3 +113,47 @@ func TestAssignWindow(t *testing.T) {
 		}
 	}
 }
+
+func TestMapWindow(t *testing.T) {
+	tests := []struct {
+		name     string
+		wfn      *window.Fn
+		in       typex.Window
+		expected typex.Window
+	}{
+		{
+			"interval to global",
+			window.NewGlobalWindows(),
+			window.IntervalWindow{Start: 0, End: 1000},
+			window.GlobalWindow{},
+		},
+		{
+			"global to global",
+			window.NewGlobalWindows(),
+			window.GlobalWindow{},
+			window.GlobalWindow{},
+		},
+		{
+			"interval to interval",
+			window.NewFixedWindows(1000 * time.Millisecond),
+			window.IntervalWindow{Start: 0, End: 100},
+			window.IntervalWindow{Start: 0, End: 1000},
+		},
+		{
+			"interval to sliding",
+			window.NewSlidingWindows(500*time.Millisecond, 1000*time.Millisecond),
+			window.IntervalWindow{Start: 0, End: 600},
+			window.IntervalWindow{Start: 500, End: 1500},

Review comment:
       The "earliest" window should be 0-1000 here I think.
   
   Since this one is trickier, I suggest we copy the testing values that Java's unit test uses (minus the offsets, which we don't support at present)
   
   https://github.com/apache/beam/blob/4b7b74673b647c8d964b4877a8d66d47096acce4/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java#L175

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -93,6 +94,41 @@ func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
 
+// ValidateWindowedSideInputs checks that side inputs have accurate windowing information when used.
+func ValidateWindowedSideInputs(s beam.Scope) {
+	timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
+
+	timestampedData = beam.DropKey(s, timestampedData)
+
+	windowSize := 1 * time.Second
+
+	validateSums := func(s beam.Scope, wfn, sideFn *window.Fn, in, side beam.PCollection, expected ...interface{}) {
+		wData := beam.WindowInto(s, wfn, in)
+		wSide := beam.WindowInto(s, sideFn, side)
+
+		sums := beam.ParDo(s, sumSideInputs, wData, beam.SideInput{Input: wSide})
+
+		sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+
+		passert.Equals(s, sums, expected...)
+	}
+
+	validateSums(s.Scope("Fixed-Global"), window.NewFixedWindows(windowSize), window.NewGlobalWindows(), timestampedData, timestampedData, 7, 8, 9)
+	validateSums(s.Scope("Fixed-Same"), window.NewFixedWindows(windowSize), window.NewFixedWindows(windowSize), timestampedData, timestampedData, 2, 4, 6)
+	validateSums(s.Scope("Fixed-Big"), window.NewFixedWindows(windowSize), window.NewFixedWindows(10*time.Second), timestampedData, timestampedData, 7, 8, 9)
+	validateSums(s.Scope("Fixed-Sliding"), window.NewFixedWindows(windowSize), window.NewSlidingWindows(windowSize, 2*windowSize), timestampedData, timestampedData, 7, 4, 6)
+	validateSums(s.Scope("Sliding-Fixed"), window.NewSlidingWindows(windowSize, 2*windowSize), window.NewFixedWindows(windowSize), timestampedData, timestampedData, 2, 3, 4, 5, 6, 3)

Review comment:
       Just so I understand what's going on for these sums, which we should probably add a clarifying comment for, as they are harder to figure out quickly vs the plain fixed ones.
   
   For Fixed-Sliding
   Main: With window size 1, each window contains 1 element (1, 2, 3)
   Side: window size 2, each window starts at 1. So we have [1], [1,2], [2,3], [3]
   So what gets computed here should be with earliest windows:
   (1, [1])  = 2
   (2, [1, 2]) = 5
   (3, [2, 3]) = 8
   
   What we have here does match what's implemented at least (latest windows).
   (1, [1, 2])  = 4
   (2, [2, 3]) = 7
   (3, [3]) = 6
   
   For sliding-Fixed:
   We have 
   ([1], [1]) = 2
   ([1, 2], [2]) = 3, 4
   ([2, 3], [3]) = 5, 6
   ([3], [] ) = 3
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15743:
URL: https://github.com/apache/beam/pull/15743#discussion_r732000317



##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -93,6 +94,47 @@ func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
 
+// ValidateWindowedSideInputs checks that side inputs have accurate windowing information when used.
+func ValidateWindowedSideInputs(s beam.Scope) {
+	timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
+	timestampedSide := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
+
+	timestampedData = beam.DropKey(s, timestampedData)
+	timestampedSide = beam.DropKey(s, timestampedSide)
+
+	_ = timestampedSide
+
+	windowSize := 1 * time.Second
+
+	validateSums := func(s beam.Scope, wfn, sideFn *window.Fn, in, side beam.PCollection, expected ...interface{}) {
+		wData := beam.WindowInto(s, wfn, in)
+		wSide := beam.WindowInto(s, sideFn, side)
+
+		sums := beam.ParDo(s, sumSideInputs, wData, beam.SideInput{Input: wSide})
+
+		sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+
+		passert.Equals(s, sums, expected...)
+	}
+
+	// This works.
+	validateSums(s.Scope("Fixed-Global"), window.NewFixedWindows(windowSize), window.NewGlobalWindows(), timestampedData, timestampedData, 7, 8, 9)
+	// So does this.
+	validateSums(s.Scope("Fixed-Same"), window.NewFixedWindows(windowSize), window.NewFixedWindows(windowSize), timestampedData, timestampedData, 2, 4, 6)
+
+	// Thise doesn't
+	validateSums(s.Scope("Fixed-Big"), window.NewFixedWindows(windowSize), window.NewFixedWindows(10*time.Second), timestampedData, timestampedSide, 7, 8, 9)

Review comment:
       Done

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -93,6 +94,47 @@ func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
 
+// ValidateWindowedSideInputs checks that side inputs have accurate windowing information when used.
+func ValidateWindowedSideInputs(s beam.Scope) {
+	timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
+	timestampedSide := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
+
+	timestampedData = beam.DropKey(s, timestampedData)
+	timestampedSide = beam.DropKey(s, timestampedSide)
+
+	_ = timestampedSide
+
+	windowSize := 1 * time.Second
+
+	validateSums := func(s beam.Scope, wfn, sideFn *window.Fn, in, side beam.PCollection, expected ...interface{}) {
+		wData := beam.WindowInto(s, wfn, in)
+		wSide := beam.WindowInto(s, sideFn, side)
+
+		sums := beam.ParDo(s, sumSideInputs, wData, beam.SideInput{Input: wSide})
+
+		sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+
+		passert.Equals(s, sums, expected...)
+	}
+
+	// This works.

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck merged pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
lostluck merged pull request #15743:
URL: https://github.com/apache/beam/pull/15743


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on pull request #15743:
URL: https://github.com/apache/beam/pull/15743#issuecomment-946094370






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15743:
URL: https://github.com/apache/beam/pull/15743#discussion_r732174566



##########
File path: sdks/go/pkg/beam/core/runtime/exec/window_test.go
##########
@@ -113,3 +113,47 @@ func TestAssignWindow(t *testing.T) {
 		}
 	}
 }
+
+func TestMapWindow(t *testing.T) {
+	tests := []struct {
+		name     string
+		wfn      *window.Fn
+		in       typex.Window
+		expected typex.Window
+	}{
+		{
+			"interval to global",
+			window.NewGlobalWindows(),
+			window.IntervalWindow{Start: 0, End: 1000},
+			window.GlobalWindow{},
+		},
+		{
+			"global to global",
+			window.NewGlobalWindows(),
+			window.GlobalWindow{},
+			window.GlobalWindow{},
+		},
+		{
+			"interval to interval",
+			window.NewFixedWindows(1000 * time.Millisecond),
+			window.IntervalWindow{Start: 0, End: 100},
+			window.IntervalWindow{Start: 0, End: 1000},
+		},
+		{
+			"interval to sliding",
+			window.NewSlidingWindows(500*time.Millisecond, 1000*time.Millisecond),
+			window.IntervalWindow{Start: 0, End: 600},
+			window.IntervalWindow{Start: 500, End: 1500},

Review comment:
       Fixed along with assignWindows bug for the sliding windows case

##########
File path: sdks/go/pkg/beam/core/runtime/exec/window.go
##########
@@ -96,3 +96,23 @@ func (w *WindowInto) Down(ctx context.Context) error {
 func (w *WindowInto) String() string {
 	return fmt.Sprintf("WindowInto[%v]. Out:%v", w.Fn, w.Out.ID())
 }
+
+// WindowMapper defines an interface maps windows from a main input window space
+// to windows from a side input window space. Used during side input materialization.
+type WindowMapper interface {
+	MapWindow(w typex.Window) (typex.Window, error)
+}
+
+type windowMapper struct {
+	wfn *window.Fn
+}
+
+func (f *windowMapper) MapWindow(w typex.Window) (typex.Window, error) {
+	candidates := assignWindows(f.wfn, w.MaxTimestamp())
+	if len(candidates) == 0 {
+		return nil, fmt.Errorf("failed to map main input window to side input window with WindowFn %v", f.wfn.String())
+	}
+	// Return latest candidate window in terms of event time (only relevant for sliding windows)
+	// Sliding windows append the latest window first in assignWindows.
+	return candidates[0], nil

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15743:
URL: https://github.com/apache/beam/pull/15743#discussion_r732062482



##########
File path: sdks/go/pkg/beam/core/runtime/exec/window.go
##########
@@ -96,3 +96,23 @@ func (w *WindowInto) Down(ctx context.Context) error {
 func (w *WindowInto) String() string {
 	return fmt.Sprintf("WindowInto[%v]. Out:%v", w.Fn, w.Out.ID())
 }
+
+// WindowMapper defines an interface maps windows from a main input window space
+// to windows from a side input window space. Used during side input materialization.
+type WindowMapper interface {
+	MapWindow(w typex.Window) (typex.Window, error)
+}
+
+type windowMapper struct {
+	wfn *window.Fn
+}
+
+func (f *windowMapper) MapWindow(w typex.Window) (typex.Window, error) {
+	candidates := assignWindows(f.wfn, w.MaxTimestamp())
+	if len(candidates) == 0 {
+		return nil, fmt.Errorf("failed to map main input window to side input window with WindowFn %v", f.wfn.String())
+	}
+	// Return latest candidate window in terms of event time (only relevant for sliding windows)
+	// Sliding windows append the latest window first in assignWindows.
+	return candidates[0], nil

Review comment:
       So the Python comment of "last candidate" also generates them backwards. Got it. Explicitly calling out behavior in terms of event time is much clearer. 
   
   The current implementation of getting the first element in the slice matches the comment and intent from what I understood at the time, but fixing it isn't a big issue (apart from updating the tests to reflect that difference)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on pull request #15743:
URL: https://github.com/apache/beam/pull/15743#issuecomment-946098257


   Run Go Flink ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org