You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/03 00:49:32 UTC

[GitHub] [beam] ibzib opened a new pull request #16113: [BEAM-12976] Implement pipeline visitor to get global field access in…

ibzib opened a new pull request #16113:
URL: https://github.com/apache/beam/pull/16113


   …formation.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on pull request #16113: [BEAM-12976] Implement pipeline visitor to get global field access in…

Posted by GitBox <gi...@apache.org>.
apilloud commented on pull request #16113:
URL: https://github.com/apache/beam/pull/16113#issuecomment-988298766


   Don't yell at the machine too loudly. It might start a robot uprising.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #16113: [BEAM-12976] Implement pipeline visitor to get global field access in…

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #16113:
URL: https://github.com/apache/beam/pull/16113#issuecomment-988328633


   > Don't yell at the machine too loudly. It might start a robot uprising.
   
   Maybe the uprising has already silently begun; Jenkins is rebelling against us by timing out our tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #16113: [BEAM-12976] Implement pipeline visitor to get global field access in…

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #16113:
URL: https://github.com/apache/beam/pull/16113#issuecomment-988248806


   > I left a bunch of comments, I don't think this works as is? You've used the TupleTag key where you should use the PCollection.
   
   The assumption is that each PCollection's self-identifying TupleTag is unique. But I agree using PCollection is better. TupleTags get kind of confusing because it is hard to distinguish which ones are globally unique and which ones are only local.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib merged pull request #16113: [BEAM-12976] Implement pipeline visitor to get global field access in…

Posted by GitBox <gi...@apache.org>.
ibzib merged pull request #16113:
URL: https://github.com/apache/beam/pull/16113


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #16113: [BEAM-12976] Implement pipeline visitor to get global field access in…

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #16113:
URL: https://github.com/apache/beam/pull/16113#discussion_r764345354



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FieldAccessVisitor.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/** Computes which Schema fields are (or conversely, are not) accessed in a pipeline. */
+class FieldAccessVisitor extends PipelineVisitor.Defaults {
+  private final Map<String, FieldAccessDescriptor> pCollectionFieldAccess = new HashMap<>();
+
+  /**
+   * Returns a map keyed by the {@link TupleTag} ID referencing a PCollection. Values are the set of
+   * fields accessed on that PCollection.
+   */
+  ImmutableMap<String, FieldAccessDescriptor> getPCollectionFieldAccess() {
+    return ImmutableMap.copyOf(pCollectionFieldAccess);
+  }
+
+  @Override
+  public void visitPrimitiveTransform(Node node) {
+    Map<String, FieldAccessDescriptor> currentFieldAccess = getFieldAccess(node);
+    for (Entry<String, FieldAccessDescriptor> entry : currentFieldAccess.entrySet()) {
+      FieldAccessDescriptor previousFieldAccess = pCollectionFieldAccess.get(entry.getKey());
+      FieldAccessDescriptor newFieldAccess =
+          previousFieldAccess == null
+              ? entry.getValue()
+              : FieldAccessDescriptor.union(
+                  ImmutableList.of(previousFieldAccess, entry.getValue()));
+      pCollectionFieldAccess.put(entry.getKey(), newFieldAccess);
+    }
+  }
+
+  private static Map<String, FieldAccessDescriptor> getFieldAccess(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    HashMap<String, FieldAccessDescriptor> access = new HashMap<>();
+
+    if (transform instanceof MultiOutput) {
+      DoFn<?, ?> fn = ((MultiOutput<?, ?>) transform).getFn();
+      Pair<TupleTag<?>, PCollection<?>> mainInput = getMainInputTagId(node);
+      FieldAccessDescriptor fields =
+          ParDo.getDoFnSchemaInformation(fn, mainInput.getRight()).getFieldAccessDescriptor();
+      access.put(mainInput.getLeft().getId(), fields);
+    }
+
+    // For every input without field access info, we must assume all fields need to be accessed.
+    for (TupleTag<?> tag : node.getInputs().keySet()) {
+      if (!access.containsKey(tag.getId())) {
+        access.put(tag.getId(), FieldAccessDescriptor.withAllFields());
+      }
+    }
+
+    return ImmutableMap.copyOf(access);
+  }
+
+  private static Pair<TupleTag<?>, PCollection<?>> getMainInputTagId(Node node) {
+    HashSet<TupleTag<?>> mainInputTags = new HashSet<>(node.getInputs().keySet());

Review comment:
       It was only mainInputTags after subtracting additional input tags. I refactored this a bit.

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FieldAccessVisitor.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/** Computes which Schema fields are (or conversely, are not) accessed in a pipeline. */
+class FieldAccessVisitor extends PipelineVisitor.Defaults {
+  private final Map<String, FieldAccessDescriptor> pCollectionFieldAccess = new HashMap<>();
+
+  /**
+   * Returns a map keyed by the {@link TupleTag} ID referencing a PCollection. Values are the set of
+   * fields accessed on that PCollection.
+   */
+  ImmutableMap<String, FieldAccessDescriptor> getPCollectionFieldAccess() {
+    return ImmutableMap.copyOf(pCollectionFieldAccess);
+  }
+
+  @Override
+  public void visitPrimitiveTransform(Node node) {
+    Map<String, FieldAccessDescriptor> currentFieldAccess = getFieldAccess(node);
+    for (Entry<String, FieldAccessDescriptor> entry : currentFieldAccess.entrySet()) {
+      FieldAccessDescriptor previousFieldAccess = pCollectionFieldAccess.get(entry.getKey());
+      FieldAccessDescriptor newFieldAccess =
+          previousFieldAccess == null
+              ? entry.getValue()
+              : FieldAccessDescriptor.union(
+                  ImmutableList.of(previousFieldAccess, entry.getValue()));
+      pCollectionFieldAccess.put(entry.getKey(), newFieldAccess);
+    }
+  }
+
+  private static Map<String, FieldAccessDescriptor> getFieldAccess(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    HashMap<String, FieldAccessDescriptor> access = new HashMap<>();
+
+    if (transform instanceof MultiOutput) {
+      DoFn<?, ?> fn = ((MultiOutput<?, ?>) transform).getFn();
+      Pair<TupleTag<?>, PCollection<?>> mainInput = getMainInputTagId(node);
+      FieldAccessDescriptor fields =
+          ParDo.getDoFnSchemaInformation(fn, mainInput.getRight()).getFieldAccessDescriptor();
+      access.put(mainInput.getLeft().getId(), fields);
+    }
+
+    // For every input without field access info, we must assume all fields need to be accessed.
+    for (TupleTag<?> tag : node.getInputs().keySet()) {
+      if (!access.containsKey(tag.getId())) {
+        access.put(tag.getId(), FieldAccessDescriptor.withAllFields());
+      }
+    }
+
+    return ImmutableMap.copyOf(access);
+  }
+
+  private static Pair<TupleTag<?>, PCollection<?>> getMainInputTagId(Node node) {

Review comment:
       Removed per your other comment.

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FieldAccessVisitor.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/** Computes which Schema fields are (or conversely, are not) accessed in a pipeline. */
+class FieldAccessVisitor extends PipelineVisitor.Defaults {
+  private final Map<String, FieldAccessDescriptor> pCollectionFieldAccess = new HashMap<>();
+
+  /**
+   * Returns a map keyed by the {@link TupleTag} ID referencing a PCollection. Values are the set of
+   * fields accessed on that PCollection.
+   */
+  ImmutableMap<String, FieldAccessDescriptor> getPCollectionFieldAccess() {
+    return ImmutableMap.copyOf(pCollectionFieldAccess);
+  }
+
+  @Override
+  public void visitPrimitiveTransform(Node node) {
+    Map<String, FieldAccessDescriptor> currentFieldAccess = getFieldAccess(node);
+    for (Entry<String, FieldAccessDescriptor> entry : currentFieldAccess.entrySet()) {
+      FieldAccessDescriptor previousFieldAccess = pCollectionFieldAccess.get(entry.getKey());
+      FieldAccessDescriptor newFieldAccess =
+          previousFieldAccess == null
+              ? entry.getValue()
+              : FieldAccessDescriptor.union(
+                  ImmutableList.of(previousFieldAccess, entry.getValue()));
+      pCollectionFieldAccess.put(entry.getKey(), newFieldAccess);
+    }
+  }
+
+  private static Map<String, FieldAccessDescriptor> getFieldAccess(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    HashMap<String, FieldAccessDescriptor> access = new HashMap<>();
+
+    if (transform instanceof MultiOutput) {
+      DoFn<?, ?> fn = ((MultiOutput<?, ?>) transform).getFn();
+      Pair<TupleTag<?>, PCollection<?>> mainInput = getMainInputTagId(node);

Review comment:
       Done.

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FieldAccessVisitor.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/** Computes which Schema fields are (or conversely, are not) accessed in a pipeline. */
+class FieldAccessVisitor extends PipelineVisitor.Defaults {
+  private final Map<String, FieldAccessDescriptor> pCollectionFieldAccess = new HashMap<>();
+
+  /**
+   * Returns a map keyed by the {@link TupleTag} ID referencing a PCollection. Values are the set of
+   * fields accessed on that PCollection.
+   */
+  ImmutableMap<String, FieldAccessDescriptor> getPCollectionFieldAccess() {
+    return ImmutableMap.copyOf(pCollectionFieldAccess);
+  }
+
+  @Override
+  public void visitPrimitiveTransform(Node node) {
+    Map<String, FieldAccessDescriptor> currentFieldAccess = getFieldAccess(node);
+    for (Entry<String, FieldAccessDescriptor> entry : currentFieldAccess.entrySet()) {
+      FieldAccessDescriptor previousFieldAccess = pCollectionFieldAccess.get(entry.getKey());
+      FieldAccessDescriptor newFieldAccess =
+          previousFieldAccess == null
+              ? entry.getValue()
+              : FieldAccessDescriptor.union(
+                  ImmutableList.of(previousFieldAccess, entry.getValue()));
+      pCollectionFieldAccess.put(entry.getKey(), newFieldAccess);
+    }
+  }
+
+  private static Map<String, FieldAccessDescriptor> getFieldAccess(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    HashMap<String, FieldAccessDescriptor> access = new HashMap<>();
+
+    if (transform instanceof MultiOutput) {
+      DoFn<?, ?> fn = ((MultiOutput<?, ?>) transform).getFn();
+      Pair<TupleTag<?>, PCollection<?>> mainInput = getMainInputTagId(node);
+      FieldAccessDescriptor fields =
+          ParDo.getDoFnSchemaInformation(fn, mainInput.getRight()).getFieldAccessDescriptor();

Review comment:
       getFieldAccessDescriptor returns a vacuous "all fields" for DoFns without actual schema information, so I copied that convention. I added a test case with no schemas.

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FieldAccessVisitor.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/** Computes which Schema fields are (or conversely, are not) accessed in a pipeline. */
+class FieldAccessVisitor extends PipelineVisitor.Defaults {
+  private final Map<String, FieldAccessDescriptor> pCollectionFieldAccess = new HashMap<>();

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on pull request #16113: [BEAM-12976] Implement pipeline visitor to get global field access in…

Posted by GitBox <gi...@apache.org>.
apilloud commented on pull request #16113:
URL: https://github.com/apache/beam/pull/16113#issuecomment-988252842


   My understanding is that TupleTags are edge labels that are only unique on the consuming node, they are never globally unique.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on a change in pull request #16113: [BEAM-12976] Implement pipeline visitor to get global field access in…

Posted by GitBox <gi...@apache.org>.
apilloud commented on a change in pull request #16113:
URL: https://github.com/apache/beam/pull/16113#discussion_r763294590



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FieldAccessVisitor.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/** Computes which Schema fields are (or conversely, are not) accessed in a pipeline. */
+class FieldAccessVisitor extends PipelineVisitor.Defaults {
+  private final Map<String, FieldAccessDescriptor> pCollectionFieldAccess = new HashMap<>();
+
+  /**
+   * Returns a map keyed by the {@link TupleTag} ID referencing a PCollection. Values are the set of
+   * fields accessed on that PCollection.
+   */
+  ImmutableMap<String, FieldAccessDescriptor> getPCollectionFieldAccess() {
+    return ImmutableMap.copyOf(pCollectionFieldAccess);
+  }
+
+  @Override
+  public void visitPrimitiveTransform(Node node) {
+    Map<String, FieldAccessDescriptor> currentFieldAccess = getFieldAccess(node);
+    for (Entry<String, FieldAccessDescriptor> entry : currentFieldAccess.entrySet()) {
+      FieldAccessDescriptor previousFieldAccess = pCollectionFieldAccess.get(entry.getKey());
+      FieldAccessDescriptor newFieldAccess =
+          previousFieldAccess == null
+              ? entry.getValue()
+              : FieldAccessDescriptor.union(
+                  ImmutableList.of(previousFieldAccess, entry.getValue()));
+      pCollectionFieldAccess.put(entry.getKey(), newFieldAccess);
+    }
+  }
+
+  private static Map<String, FieldAccessDescriptor> getFieldAccess(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    HashMap<String, FieldAccessDescriptor> access = new HashMap<>();
+
+    if (transform instanceof MultiOutput) {
+      DoFn<?, ?> fn = ((MultiOutput<?, ?>) transform).getFn();
+      Pair<TupleTag<?>, PCollection<?>> mainInput = getMainInputTagId(node);
+      FieldAccessDescriptor fields =
+          ParDo.getDoFnSchemaInformation(fn, mainInput.getRight()).getFieldAccessDescriptor();
+      access.put(mainInput.getLeft().getId(), fields);
+    }
+
+    // For every input without field access info, we must assume all fields need to be accessed.
+    for (TupleTag<?> tag : node.getInputs().keySet()) {
+      if (!access.containsKey(tag.getId())) {
+        access.put(tag.getId(), FieldAccessDescriptor.withAllFields());
+      }
+    }
+
+    return ImmutableMap.copyOf(access);
+  }
+
+  private static Pair<TupleTag<?>, PCollection<?>> getMainInputTagId(Node node) {

Review comment:
       nit: This is also deceptively named, it appears to actually be `getMainInput`. (It gets both the PCollection and tagid.)

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FieldAccessVisitor.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/** Computes which Schema fields are (or conversely, are not) accessed in a pipeline. */
+class FieldAccessVisitor extends PipelineVisitor.Defaults {
+  private final Map<String, FieldAccessDescriptor> pCollectionFieldAccess = new HashMap<>();
+
+  /**
+   * Returns a map keyed by the {@link TupleTag} ID referencing a PCollection. Values are the set of
+   * fields accessed on that PCollection.
+   */
+  ImmutableMap<String, FieldAccessDescriptor> getPCollectionFieldAccess() {
+    return ImmutableMap.copyOf(pCollectionFieldAccess);
+  }
+
+  @Override
+  public void visitPrimitiveTransform(Node node) {
+    Map<String, FieldAccessDescriptor> currentFieldAccess = getFieldAccess(node);
+    for (Entry<String, FieldAccessDescriptor> entry : currentFieldAccess.entrySet()) {
+      FieldAccessDescriptor previousFieldAccess = pCollectionFieldAccess.get(entry.getKey());
+      FieldAccessDescriptor newFieldAccess =
+          previousFieldAccess == null
+              ? entry.getValue()
+              : FieldAccessDescriptor.union(
+                  ImmutableList.of(previousFieldAccess, entry.getValue()));
+      pCollectionFieldAccess.put(entry.getKey(), newFieldAccess);
+    }
+  }
+
+  private static Map<String, FieldAccessDescriptor> getFieldAccess(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    HashMap<String, FieldAccessDescriptor> access = new HashMap<>();
+
+    if (transform instanceof MultiOutput) {
+      DoFn<?, ?> fn = ((MultiOutput<?, ?>) transform).getFn();
+      Pair<TupleTag<?>, PCollection<?>> mainInput = getMainInputTagId(node);

Review comment:
       nit:  I believe this function reduces readability (especially with the `Pair` return type) it would be cleaner inline.

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FieldAccessVisitor.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/** Computes which Schema fields are (or conversely, are not) accessed in a pipeline. */
+class FieldAccessVisitor extends PipelineVisitor.Defaults {
+  private final Map<String, FieldAccessDescriptor> pCollectionFieldAccess = new HashMap<>();
+
+  /**
+   * Returns a map keyed by the {@link TupleTag} ID referencing a PCollection. Values are the set of
+   * fields accessed on that PCollection.
+   */
+  ImmutableMap<String, FieldAccessDescriptor> getPCollectionFieldAccess() {
+    return ImmutableMap.copyOf(pCollectionFieldAccess);
+  }
+
+  @Override
+  public void visitPrimitiveTransform(Node node) {
+    Map<String, FieldAccessDescriptor> currentFieldAccess = getFieldAccess(node);
+    for (Entry<String, FieldAccessDescriptor> entry : currentFieldAccess.entrySet()) {
+      FieldAccessDescriptor previousFieldAccess = pCollectionFieldAccess.get(entry.getKey());
+      FieldAccessDescriptor newFieldAccess =
+          previousFieldAccess == null
+              ? entry.getValue()
+              : FieldAccessDescriptor.union(
+                  ImmutableList.of(previousFieldAccess, entry.getValue()));
+      pCollectionFieldAccess.put(entry.getKey(), newFieldAccess);
+    }
+  }
+
+  private static Map<String, FieldAccessDescriptor> getFieldAccess(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    HashMap<String, FieldAccessDescriptor> access = new HashMap<>();
+
+    if (transform instanceof MultiOutput) {
+      DoFn<?, ?> fn = ((MultiOutput<?, ?>) transform).getFn();
+      Pair<TupleTag<?>, PCollection<?>> mainInput = getMainInputTagId(node);
+      FieldAccessDescriptor fields =
+          ParDo.getDoFnSchemaInformation(fn, mainInput.getRight()).getFieldAccessDescriptor();
+      access.put(mainInput.getLeft().getId(), fields);
+    }
+
+    // For every input without field access info, we must assume all fields need to be accessed.
+    for (TupleTag<?> tag : node.getInputs().keySet()) {
+      if (!access.containsKey(tag.getId())) {
+        access.put(tag.getId(), FieldAccessDescriptor.withAllFields());
+      }
+    }
+
+    return ImmutableMap.copyOf(access);
+  }
+
+  private static Pair<TupleTag<?>, PCollection<?>> getMainInputTagId(Node node) {
+    HashSet<TupleTag<?>> mainInputTags = new HashSet<>(node.getInputs().keySet());

Review comment:
       nit: This is deceptively named, isn't this just `inputTags`?

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FieldAccessVisitor.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/** Computes which Schema fields are (or conversely, are not) accessed in a pipeline. */
+class FieldAccessVisitor extends PipelineVisitor.Defaults {
+  private final Map<String, FieldAccessDescriptor> pCollectionFieldAccess = new HashMap<>();
+
+  /**
+   * Returns a map keyed by the {@link TupleTag} ID referencing a PCollection. Values are the set of
+   * fields accessed on that PCollection.
+   */
+  ImmutableMap<String, FieldAccessDescriptor> getPCollectionFieldAccess() {
+    return ImmutableMap.copyOf(pCollectionFieldAccess);
+  }
+
+  @Override
+  public void visitPrimitiveTransform(Node node) {
+    Map<String, FieldAccessDescriptor> currentFieldAccess = getFieldAccess(node);
+    for (Entry<String, FieldAccessDescriptor> entry : currentFieldAccess.entrySet()) {
+      FieldAccessDescriptor previousFieldAccess = pCollectionFieldAccess.get(entry.getKey());
+      FieldAccessDescriptor newFieldAccess =
+          previousFieldAccess == null
+              ? entry.getValue()
+              : FieldAccessDescriptor.union(
+                  ImmutableList.of(previousFieldAccess, entry.getValue()));
+      pCollectionFieldAccess.put(entry.getKey(), newFieldAccess);
+    }
+  }
+
+  private static Map<String, FieldAccessDescriptor> getFieldAccess(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    HashMap<String, FieldAccessDescriptor> access = new HashMap<>();
+
+    if (transform instanceof MultiOutput) {
+      DoFn<?, ?> fn = ((MultiOutput<?, ?>) transform).getFn();
+      Pair<TupleTag<?>, PCollection<?>> mainInput = getMainInputTagId(node);
+      FieldAccessDescriptor fields =
+          ParDo.getDoFnSchemaInformation(fn, mainInput.getRight()).getFieldAccessDescriptor();

Review comment:
       How will this handle inputs that don't have schemas (and below for other inputs)?

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FieldAccessVisitor.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/** Computes which Schema fields are (or conversely, are not) accessed in a pipeline. */
+class FieldAccessVisitor extends PipelineVisitor.Defaults {
+  private final Map<String, FieldAccessDescriptor> pCollectionFieldAccess = new HashMap<>();

Review comment:
       Shouldn't this be `Map<PCollection, FieldAccessDescriptor>`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #16113: [BEAM-12976] Implement pipeline visitor to get global field access in…

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #16113:
URL: https://github.com/apache/beam/pull/16113#issuecomment-985134223


   R: @apilloud 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #16113: [BEAM-12976] Implement pipeline visitor to get global field access in…

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #16113:
URL: https://github.com/apache/beam/pull/16113#issuecomment-988288033


   RUN JAVA PRECOMMIT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org