You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/09/27 15:53:43 UTC

[GitHub] [beam] jrmccluskey opened a new pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

jrmccluskey opened a new pull request #15594:
URL: https://github.com/apache/beam/pull/15594


   Sets valid cache tokens for a process bundle request and queries cache when materializing side inputs. Makes modifications to SideInputAdapter interface to get necessary side input/transform ID information and updates testing/direct runner implementations to match. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r716979058



##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -33,6 +33,7 @@ const iterableSideInputKey = ""
 // encapsulates StreamID and coding as needed.
 type SideInputAdapter interface {
 	NewIterable(ctx context.Context, reader StateReader, w typex.Window) (ReStream, error)
+	GetIDs() (StreamID, string)

Review comment:
       That's a solution if we wanted to move the cache checking logic here, although it seems like it would still need a new function in the interface to handle it (changing the NewIterable signature to return a ReStream *or* a ReusableInput feels messy.) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on pull request #15594:
URL: https://github.com/apache/beam/pull/15594#issuecomment-928013001


   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on pull request #15594:
URL: https://github.com/apache/beam/pull/15594#issuecomment-928034830


   Run Go Samza ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15594:
URL: https://github.com/apache/beam/pull/15594#issuecomment-928163755


   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r716947697



##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {
+		cache = reader.GetSideInputCache()
+	} else {
+		cache = &statecache.SideInputCache{}
+		cache.Init(1)

Review comment:
       Here's where the factory function or similar would be useful. However, an alternative is a simpler/1 element cache that adheres to the same interface as the other cache.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -24,6 +24,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"

Review comment:
       We don't want the exec package to depend on parts defined in harness, at least explicitly. It's bad coupling, and makes the SDK tree harder to follow.
   
   This means we need to have a factory function or similar that gets/initializes the cache that is passed in to create the cache, or change it so the reader never provides a nil version of the cache. In particular, we can have the exec package define an interface of how it interacts with the cache, and then there's no explicit coupling at all. This is good Go design, as interfaces should be defined by the caller/user of the interface, not the other way around.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {
+		cache = reader.GetSideInputCache()
+	} else {
+		cache = &statecache.SideInputCache{}
+		cache.Init(1)
+	}
 	for i := 0; i < len(streams); i++ {
+		sid, sideInputID := side[i].GetIDs()
+		var transformID string
+		if sideInputID == "" {
+			transformID = ""

Review comment:
       It would be worth commenting on why this branching is necessary. Do we know what causes this case?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {

Review comment:
       When is the reader going to be nil? Is it something to worry about?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {
+		cache = reader.GetSideInputCache()
+	} else {
+		cache = &statecache.SideInputCache{}
+		cache.Init(1)
+	}
 	for i := 0; i < len(streams); i++ {
+		sid, sideInputID := side[i].GetIDs()
+		var transformID string
+		if sideInputID == "" {
+			transformID = ""
+		} else {
+			transformID = sid.PtransformID
+		}
+		c := cache.QueryCache(transformID, sideInputID)
+		// Cache hit
+		if c != nil {
+			ret = append(ret, c)
+			continue
+		}

Review comment:
       We can inline this call into the if;
   
   if 
   ```suggestion
   		if c := cache.QueryCache(transformID, sideInputID); c != nil {
   			// Cache hit
   			ret = append(ret, c)
   			continue
   		}
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -33,6 +33,7 @@ const iterableSideInputKey = ""
 // encapsulates StreamID and coding as needed.
 type SideInputAdapter interface {
 	NewIterable(ctx context.Context, reader StateReader, w typex.Window) (ReStream, error)
+	GetIDs() (StreamID, string)

Review comment:
       TBH it still feels like the the Adapter is the right place for the cache to be looked up, since it's dealing with actual elements and similar, and produces the ReStream instances. It's already getting a StateReader that it can pull the cache from too (depends on what happens to the cache of course, it might be best if it's simply hidden within the reader entirely...).
   
   Specifially, any time you need to add methods to access data elsewhere, the question is whether that has to be the case? What does the solution look like when you're not able/allowed to do that?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -76,6 +77,11 @@ func (s *sideInputAdapter) NewIterable(ctx context.Context, reader StateReader,
 	}, nil
 }
 
+// GetIDs returns the StreamID and Side Input ID for the adapter. Used primarily for sidei nput caching.

Review comment:
       typo
   ```suggestion
   // GetIDs returns the StreamID and Side Input ID for the adapter. Used primarily for side input caching.
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go
##########
@@ -125,7 +125,7 @@ func makeRequest(transformID, sideInputID string, t token) fnpb.ProcessBundleReq
 	wrap.SideInput = &side
 	tok.Type = &wrap
 	tok.Token = []byte(t)
-	return tok
+	return &tok

Review comment:
       Style-wise, we should probably inline all of this, as it's easier on the reader to see that the compiler can make all the allocations at once.
   ```
   	return &fnpb.ProcessBundleRequest_CacheToken{
   		Token: []byte(t),
   		Type:  &fnpb.ProcessBundleRequest_CacheToken_SideInput_{
   			SideInput: &fnpb.ProcessBundleRequest_CacheToken_SideInput{
   				TransformId: transformID,
   				SideInputId: sideInputID,
   			},
   		},
     	}
   }
   ```
   
   The main reason to separate things out when constructing protos vs inlineing is error handling. This function doesn't have anything that can error, so all inlined it can go.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -76,6 +78,19 @@ func (s *sideInputAdapter) NewIterable(ctx context.Context, reader StateReader,
 	}, nil
 }
 
+// QueryCache checks a reader's side input cache for an entry with a PtransformID and sideInputID
+// and returns the entry.
+func (s *sideInputAdapter) QueryCache(reader StateReader) ReusableInput {
+	input := reader.GetSideInputCache().QueryCache(s.sid.PtransformID, s.sideInputID)
+	return input

Review comment:
       Aside: simply return in this situation rather than assigning to a new variable, it's cleaner.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,13 +268,24 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
-	for i := 0; i < len(streams); i++ {
-		s, err := makeSideInput(in[i+1].Kind, fn.Param[param[i+offset]].T, streams[i])
+	for i, adapter := range side {
+		// Cache hit
+		if c := adapter.QueryCache(reader); c != nil {

Review comment:
       Why not put this logic into the NewIterable method instead? Is it important that the makeSideInputs code is aware that a cache is being used? Does this code *need* to know these details of how the adapter works?
   
   Is there any reason the adapter can't make use of the cache itself, and still produce ReStreams (and if still a good idea, cache ReusableInputs), avoiding this code from needing to change or know the details of it happening?
   
   Per the earlier failure, ReusableInputs aren't threadsafe, so we need to cache something that is, that we can use to build what we use later on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey closed pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey closed pull request #15594:
URL: https://github.com/apache/beam/pull/15594


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on pull request #15594:
URL: https://github.com/apache/beam/pull/15594#issuecomment-928087596


   R: @lostluck 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r717006749



##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {
+		cache = reader.GetSideInputCache()
+	} else {
+		cache = &statecache.SideInputCache{}
+		cache.Init(1)
+	}
 	for i := 0; i < len(streams); i++ {
+		sid, sideInputID := side[i].GetIDs()
+		var transformID string
+		if sideInputID == "" {
+			transformID = ""

Review comment:
       It was caused by the other adapter implementations used in the direct runner/testing, which is no longer a problem




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on pull request #15594:
URL: https://github.com/apache/beam/pull/15594#issuecomment-932438486


   Concurrency issue requiring bigger rewrite and design tweaks, splitting out work into new PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r717006062



##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -24,6 +24,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"

Review comment:
       Removed reference as it was no longer necessary 

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {

Review comment:
       Fixed

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {
+		cache = reader.GetSideInputCache()
+	} else {
+		cache = &statecache.SideInputCache{}
+		cache.Init(1)

Review comment:
       No longer necessary




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r717119849



##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -76,6 +78,19 @@ func (s *sideInputAdapter) NewIterable(ctx context.Context, reader StateReader,
 	}, nil
 }
 
+// QueryCache checks a reader's side input cache for an entry with a PtransformID and sideInputID
+// and returns the entry.
+func (s *sideInputAdapter) QueryCache(reader StateReader) ReusableInput {
+	input := reader.GetSideInputCache().QueryCache(s.sid.PtransformID, s.sideInputID)
+	return input

Review comment:
       Aside: simply return in this situation rather than assigning to a new variable, it's cleaner.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,13 +268,24 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
-	for i := 0; i < len(streams); i++ {
-		s, err := makeSideInput(in[i+1].Kind, fn.Param[param[i+offset]].T, streams[i])
+	for i, adapter := range side {
+		// Cache hit
+		if c := adapter.QueryCache(reader); c != nil {

Review comment:
       Why not put this logic into the NewIterable method instead? Is it important that the makeSideInputs code is aware that a cache is being used? Does this code *need* to know these details of how the adapter works?
   
   Is there any reason the adapter can't make use of the cache itself, and still produce ReStreams (and if still a good idea, cache ReusableInputs), avoiding this code from needing to change or know the details of it happening?
   
   Per the earlier failure, ReusableInputs aren't threadsafe, so we need to cache something that is, that we can use to build what we use later on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on pull request #15594:
URL: https://github.com/apache/beam/pull/15594#issuecomment-928014561






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r716947697



##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {
+		cache = reader.GetSideInputCache()
+	} else {
+		cache = &statecache.SideInputCache{}
+		cache.Init(1)

Review comment:
       Here's where the factory function or similar would be useful. However, an alternative is a simpler/1 element cache that adheres to the same interface as the other cache.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -24,6 +24,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"

Review comment:
       We don't want the exec package to depend on parts defined in harness, at least explicitly. It's bad coupling, and makes the SDK tree harder to follow.
   
   This means we need to have a factory function or similar that gets/initializes the cache that is passed in to create the cache, or change it so the reader never provides a nil version of the cache. In particular, we can have the exec package define an interface of how it interacts with the cache, and then there's no explicit coupling at all. This is good Go design, as interfaces should be defined by the caller/user of the interface, not the other way around.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {
+		cache = reader.GetSideInputCache()
+	} else {
+		cache = &statecache.SideInputCache{}
+		cache.Init(1)
+	}
 	for i := 0; i < len(streams); i++ {
+		sid, sideInputID := side[i].GetIDs()
+		var transformID string
+		if sideInputID == "" {
+			transformID = ""

Review comment:
       It would be worth commenting on why this branching is necessary. Do we know what causes this case?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {

Review comment:
       When is the reader going to be nil? Is it something to worry about?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {
+		cache = reader.GetSideInputCache()
+	} else {
+		cache = &statecache.SideInputCache{}
+		cache.Init(1)
+	}
 	for i := 0; i < len(streams); i++ {
+		sid, sideInputID := side[i].GetIDs()
+		var transformID string
+		if sideInputID == "" {
+			transformID = ""
+		} else {
+			transformID = sid.PtransformID
+		}
+		c := cache.QueryCache(transformID, sideInputID)
+		// Cache hit
+		if c != nil {
+			ret = append(ret, c)
+			continue
+		}

Review comment:
       We can inline this call into the if;
   
   if 
   ```suggestion
   		if c := cache.QueryCache(transformID, sideInputID); c != nil {
   			// Cache hit
   			ret = append(ret, c)
   			continue
   		}
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -33,6 +33,7 @@ const iterableSideInputKey = ""
 // encapsulates StreamID and coding as needed.
 type SideInputAdapter interface {
 	NewIterable(ctx context.Context, reader StateReader, w typex.Window) (ReStream, error)
+	GetIDs() (StreamID, string)

Review comment:
       TBH it still feels like the the Adapter is the right place for the cache to be looked up, since it's dealing with actual elements and similar, and produces the ReStream instances. It's already getting a StateReader that it can pull the cache from too (depends on what happens to the cache of course, it might be best if it's simply hidden within the reader entirely...).
   
   Specifially, any time you need to add methods to access data elsewhere, the question is whether that has to be the case? What does the solution look like when you're not able/allowed to do that?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -76,6 +77,11 @@ func (s *sideInputAdapter) NewIterable(ctx context.Context, reader StateReader,
 	}, nil
 }
 
+// GetIDs returns the StreamID and Side Input ID for the adapter. Used primarily for sidei nput caching.

Review comment:
       typo
   ```suggestion
   // GetIDs returns the StreamID and Side Input ID for the adapter. Used primarily for side input caching.
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go
##########
@@ -125,7 +125,7 @@ func makeRequest(transformID, sideInputID string, t token) fnpb.ProcessBundleReq
 	wrap.SideInput = &side
 	tok.Type = &wrap
 	tok.Token = []byte(t)
-	return tok
+	return &tok

Review comment:
       Style-wise, we should probably inline all of this, as it's easier on the reader to see that the compiler can make all the allocations at once.
   ```
   	return &fnpb.ProcessBundleRequest_CacheToken{
   		Token: []byte(t),
   		Type:  &fnpb.ProcessBundleRequest_CacheToken_SideInput_{
   			SideInput: &fnpb.ProcessBundleRequest_CacheToken_SideInput{
   				TransformId: transformID,
   				SideInputId: sideInputID,
   			},
   		},
     	}
   }
   ```
   
   The main reason to separate things out when constructing protos vs inlineing is error handling. This function doesn't have anything that can error, so all inlined it can go.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on pull request #15594:
URL: https://github.com/apache/beam/pull/15594#issuecomment-928072818


   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r716978714



##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {

Review comment:
       There are multiple tests in the exec package that don't provide a reader, so I spent a bit of time eliminating null references. I'll take some time to clean this up and make it cleaner. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r716978714



##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {

Review comment:
       There are multiple tests in the exec package that don't provide a reader, so I spent a bit of time eliminating null references. I'll take some time to clean this up and make it cleaner. 

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -33,6 +33,7 @@ const iterableSideInputKey = ""
 // encapsulates StreamID and coding as needed.
 type SideInputAdapter interface {
 	NewIterable(ctx context.Context, reader StateReader, w typex.Window) (ReStream, error)
+	GetIDs() (StreamID, string)

Review comment:
       That's a solution if we wanted to move the cache checking logic here, although it seems like it would still need a new function in the interface to handle it (changing the NewIterable signature to return a ReStream *or* a ReusableInput feels messy.) 

##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go
##########
@@ -125,7 +125,7 @@ func makeRequest(transformID, sideInputID string, t token) fnpb.ProcessBundleReq
 	wrap.SideInput = &side
 	tok.Type = &wrap
 	tok.Token = []byte(t)
-	return tok
+	return &tok

Review comment:
       Done

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -33,6 +33,7 @@ const iterableSideInputKey = ""
 // encapsulates StreamID and coding as needed.
 type SideInputAdapter interface {
 	NewIterable(ctx context.Context, reader StateReader, w typex.Window) (ReStream, error)
+	GetIDs() (StreamID, string)

Review comment:
       Changing to putting cache accesses in the Adapter cleaned up a lot of the null checks. Dropped GetIDs for a first run at cache-involved functions

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -24,6 +24,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"

Review comment:
       Removed reference as it was no longer necessary 

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {

Review comment:
       Fixed

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {
+		cache = reader.GetSideInputCache()
+	} else {
+		cache = &statecache.SideInputCache{}
+		cache.Init(1)

Review comment:
       No longer necessary

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, side []SideInputAdapter
 	offset := len(param) - len(side)
 
 	var ret []ReusableInput
+	var cache *statecache.SideInputCache
+	if reader != nil {
+		cache = reader.GetSideInputCache()
+	} else {
+		cache = &statecache.SideInputCache{}
+		cache.Init(1)
+	}
 	for i := 0; i < len(streams); i++ {
+		sid, sideInputID := side[i].GetIDs()
+		var transformID string
+		if sideInputID == "" {
+			transformID = ""

Review comment:
       It was caused by the other adapter implementations used in the direct runner/testing, which is no longer a problem




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r716982345



##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go
##########
@@ -125,7 +125,7 @@ func makeRequest(transformID, sideInputID string, t token) fnpb.ProcessBundleReq
 	wrap.SideInput = &side
 	tok.Type = &wrap
 	tok.Token = []byte(t)
-	return tok
+	return &tok

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r717005723



##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -33,6 +33,7 @@ const iterableSideInputKey = ""
 // encapsulates StreamID and coding as needed.
 type SideInputAdapter interface {
 	NewIterable(ctx context.Context, reader StateReader, w typex.Window) (ReStream, error)
+	GetIDs() (StreamID, string)

Review comment:
       Changing to putting cache accesses in the Adapter cleaned up a lot of the null checks. Dropped GetIDs for a first run at cache-involved functions




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15594:
URL: https://github.com/apache/beam/pull/15594#issuecomment-928163755


   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on pull request #15594:
URL: https://github.com/apache/beam/pull/15594#issuecomment-928014134


   Run Go Flink ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15594: [BEAM-11097] Enable cross-bundle side input caching

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r717960204



##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -76,6 +78,19 @@ func (s *sideInputAdapter) NewIterable(ctx context.Context, reader StateReader,
 	}, nil
 }
 
+// QueryCache checks a reader's side input cache for an entry with a PtransformID and sideInputID
+// and returns the entry.
+func (s *sideInputAdapter) QueryCache(reader StateReader) ReusableInput {
+	input := reader.GetSideInputCache().QueryCache(s.sid.PtransformID, s.sideInputID)
+	return input

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org