You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/08 22:36:16 UTC

[GitHub] [beam] ibzib opened a new pull request #16176: [BEAM-12976] Pipeline visitor to discover pushdown opportunities.

ibzib opened a new pull request #16176:
URL: https://github.com/apache/beam/pull/16176


   Step 2 of the projection pushdown optimizer. (step 1 was #16113; step 3 will be to rewrite the graph based on the result of step 2.)
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib merged pull request #16176: [BEAM-12976] Pipeline visitor to discover pushdown opportunities.

Posted by GitBox <gi...@apache.org>.
ibzib merged pull request #16176:
URL: https://github.com/apache/beam/pull/16176


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on a change in pull request #16176: [BEAM-12976] Pipeline visitor to discover pushdown opportunities.

Posted by GitBox <gi...@apache.org>.
apilloud commented on a change in pull request #16176:
URL: https://github.com/apache/beam/pull/16176#discussion_r766212376



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PushdownProjectorVisitor.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link PipelineVisitor} to discover projection pushdown opportunities. */
+class PushdownProjectorVisitor extends PipelineVisitor.Defaults {
+  private final Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess;
+  private final Map<
+          ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      pushdownOpportunities = new HashMap<>();

Review comment:
       nit: It looks like this is write only, consider using ImmutableMap.Builder.

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PushdownProjectorVisitor.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link PipelineVisitor} to discover projection pushdown opportunities. */
+class PushdownProjectorVisitor extends PipelineVisitor.Defaults {
+  private final Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess;
+  private final Map<
+          ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      pushdownOpportunities = new HashMap<>();
+
+  /**
+   * @param pCollectionFieldAccess A map from PCollection to the fields the pipeline accesses on
+   *     that PCollection. See {@link FieldAccessVisitor}.
+   */
+  PushdownProjectorVisitor(Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess) {
+    this.pCollectionFieldAccess = pCollectionFieldAccess;
+  }
+
+  /**
+   * A map from {@link ProjectionProducer} to an inner map keyed by output PCollections of that
+   * producer. For each PCollection, the value is the {@link FieldAccessDescriptor} describing the
+   * smallest possible subset of fields the producer is required to return on that PCollection. If
+   * there is no proper subset, the result is not included.
+   */
+  Map<ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      getPushdownOpportunities() {
+    return ImmutableMap.copyOf(pushdownOpportunities);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    if (!(transform instanceof ProjectionProducer)) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+    ProjectionProducer<PTransform<?, ?>> pushdownProjector =
+        (ProjectionProducer<PTransform<?, ?>>) transform;
+    if (!pushdownProjector.supportsProjectionPushdown()) {
+      return CompositeBehavior.ENTER_TRANSFORM;

Review comment:
       What would be the downside of making this `DO_NOT_ENTER_TRANSFORM`?

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PushdownProjectorVisitor.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link PipelineVisitor} to discover projection pushdown opportunities. */
+class PushdownProjectorVisitor extends PipelineVisitor.Defaults {
+  private final Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess;
+  private final Map<
+          ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      pushdownOpportunities = new HashMap<>();
+
+  /**
+   * @param pCollectionFieldAccess A map from PCollection to the fields the pipeline accesses on
+   *     that PCollection. See {@link FieldAccessVisitor}.
+   */
+  PushdownProjectorVisitor(Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess) {
+    this.pCollectionFieldAccess = pCollectionFieldAccess;
+  }
+
+  /**
+   * A map from {@link ProjectionProducer} to an inner map keyed by output PCollections of that
+   * producer. For each PCollection, the value is the {@link FieldAccessDescriptor} describing the
+   * smallest possible subset of fields the producer is required to return on that PCollection. If
+   * there is no proper subset, the result is not included.
+   */
+  Map<ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      getPushdownOpportunities() {
+    return ImmutableMap.copyOf(pushdownOpportunities);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    if (!(transform instanceof ProjectionProducer)) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+    ProjectionProducer<PTransform<?, ?>> pushdownProjector =
+        (ProjectionProducer<PTransform<?, ?>>) transform;
+    if (!pushdownProjector.supportsProjectionPushdown()) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    ImmutableMap.Builder<PCollection<?>, FieldAccessDescriptor> builder = ImmutableMap.builder();
+    for (PCollection<?> output : node.getOutputs().values()) {
+      FieldAccessDescriptor fieldAccess = pCollectionFieldAccess.get(output);
+      if (fieldAccess != null && !fieldAccess.getAllFields()) {
+        builder.put(output, fieldAccess);
+      }
+    }
+    Map<PCollection<?>, FieldAccessDescriptor> localOpportunities = builder.build();
+    if (localOpportunities.isEmpty()) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    } else {

Review comment:
       nit: it might be slightly cleaner if you flip this to `!isEmpty()` and drop the else.

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PushdownProjectorVisitor.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link PipelineVisitor} to discover projection pushdown opportunities. */
+class PushdownProjectorVisitor extends PipelineVisitor.Defaults {
+  private final Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess;
+  private final Map<
+          ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      pushdownOpportunities = new HashMap<>();
+
+  /**
+   * @param pCollectionFieldAccess A map from PCollection to the fields the pipeline accesses on
+   *     that PCollection. See {@link FieldAccessVisitor}.
+   */
+  PushdownProjectorVisitor(Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess) {
+    this.pCollectionFieldAccess = pCollectionFieldAccess;
+  }
+
+  /**
+   * A map from {@link ProjectionProducer} to an inner map keyed by output PCollections of that
+   * producer. For each PCollection, the value is the {@link FieldAccessDescriptor} describing the
+   * smallest possible subset of fields the producer is required to return on that PCollection. If
+   * there is no proper subset, the result is not included.
+   */
+  Map<ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      getPushdownOpportunities() {
+    return ImmutableMap.copyOf(pushdownOpportunities);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    if (!(transform instanceof ProjectionProducer)) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+    ProjectionProducer<PTransform<?, ?>> pushdownProjector =
+        (ProjectionProducer<PTransform<?, ?>>) transform;
+    if (!pushdownProjector.supportsProjectionPushdown()) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    ImmutableMap.Builder<PCollection<?>, FieldAccessDescriptor> builder = ImmutableMap.builder();
+    for (PCollection<?> output : node.getOutputs().values()) {
+      FieldAccessDescriptor fieldAccess = pCollectionFieldAccess.get(output);
+      if (fieldAccess != null && !fieldAccess.getAllFields()) {
+        builder.put(output, fieldAccess);
+      }
+    }
+    Map<PCollection<?>, FieldAccessDescriptor> localOpportunities = builder.build();
+    if (localOpportunities.isEmpty()) {
+      return CompositeBehavior.ENTER_TRANSFORM;

Review comment:
       Same question, What would be the downside of making this `DO_NOT_ENTER_TRANSFORM`?

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PushdownProjectorVisitor.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link PipelineVisitor} to discover projection pushdown opportunities. */
+class PushdownProjectorVisitor extends PipelineVisitor.Defaults {
+  private final Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess;
+  private final Map<
+          ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      pushdownOpportunities = new HashMap<>();
+
+  /**
+   * @param pCollectionFieldAccess A map from PCollection to the fields the pipeline accesses on
+   *     that PCollection. See {@link FieldAccessVisitor}.
+   */
+  PushdownProjectorVisitor(Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess) {
+    this.pCollectionFieldAccess = pCollectionFieldAccess;
+  }
+
+  /**
+   * A map from {@link ProjectionProducer} to an inner map keyed by output PCollections of that
+   * producer. For each PCollection, the value is the {@link FieldAccessDescriptor} describing the
+   * smallest possible subset of fields the producer is required to return on that PCollection. If
+   * there is no proper subset, the result is not included.
+   */
+  Map<ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      getPushdownOpportunities() {
+    return ImmutableMap.copyOf(pushdownOpportunities);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    if (!(transform instanceof ProjectionProducer)) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+    ProjectionProducer<PTransform<?, ?>> pushdownProjector =
+        (ProjectionProducer<PTransform<?, ?>>) transform;
+    if (!pushdownProjector.supportsProjectionPushdown()) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    ImmutableMap.Builder<PCollection<?>, FieldAccessDescriptor> builder = ImmutableMap.builder();

Review comment:
       It looks like `actuateProjectionPushdown` is operating on `TupleTag`, should you match?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #16176: [BEAM-12976] Pipeline visitor to discover pushdown opportunities.

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #16176:
URL: https://github.com/apache/beam/pull/16176#issuecomment-989326892


   R: @apilloud 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #16176: [BEAM-12976] Pipeline visitor to discover pushdown opportunities.

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #16176:
URL: https://github.com/apache/beam/pull/16176#issuecomment-989340740


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #16176: [BEAM-12976] Pipeline visitor to discover pushdown opportunities.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #16176:
URL: https://github.com/apache/beam/pull/16176#discussion_r766235040



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PushdownProjectorVisitor.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link PipelineVisitor} to discover projection pushdown opportunities. */
+class PushdownProjectorVisitor extends PipelineVisitor.Defaults {
+  private final Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess;
+  private final Map<
+          ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      pushdownOpportunities = new HashMap<>();
+
+  /**
+   * @param pCollectionFieldAccess A map from PCollection to the fields the pipeline accesses on
+   *     that PCollection. See {@link FieldAccessVisitor}.
+   */
+  PushdownProjectorVisitor(Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess) {
+    this.pCollectionFieldAccess = pCollectionFieldAccess;
+  }
+
+  /**
+   * A map from {@link ProjectionProducer} to an inner map keyed by output PCollections of that
+   * producer. For each PCollection, the value is the {@link FieldAccessDescriptor} describing the
+   * smallest possible subset of fields the producer is required to return on that PCollection. If
+   * there is no proper subset, the result is not included.
+   */
+  Map<ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      getPushdownOpportunities() {
+    return ImmutableMap.copyOf(pushdownOpportunities);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    if (!(transform instanceof ProjectionProducer)) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+    ProjectionProducer<PTransform<?, ?>> pushdownProjector =
+        (ProjectionProducer<PTransform<?, ?>>) transform;
+    if (!pushdownProjector.supportsProjectionPushdown()) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    ImmutableMap.Builder<PCollection<?>, FieldAccessDescriptor> builder = ImmutableMap.builder();

Review comment:
       No. `actuateProjectionPushdown` needs to operate on local `TupleTag`s because the actual PCollection has not yet been created when we are defining a PTransform. Whereas we want to use a global reference here because the map describes the entire pipeline.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #16176: [BEAM-12976] Pipeline visitor to discover pushdown opportunities.

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #16176:
URL: https://github.com/apache/beam/pull/16176#issuecomment-991336946


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #16176: [BEAM-12976] Pipeline visitor to discover pushdown opportunities.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #16176:
URL: https://github.com/apache/beam/pull/16176#discussion_r768218916



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PushdownProjectorVisitor.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link PipelineVisitor} to discover projection pushdown opportunities. */
+class PushdownProjectorVisitor extends PipelineVisitor.Defaults {
+  private final Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess;
+  private final Map<
+          ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      pushdownOpportunities = new HashMap<>();
+
+  /**
+   * @param pCollectionFieldAccess A map from PCollection to the fields the pipeline accesses on
+   *     that PCollection. See {@link FieldAccessVisitor}.
+   */
+  PushdownProjectorVisitor(Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess) {
+    this.pCollectionFieldAccess = pCollectionFieldAccess;
+  }
+
+  /**
+   * A map from {@link ProjectionProducer} to an inner map keyed by output PCollections of that
+   * producer. For each PCollection, the value is the {@link FieldAccessDescriptor} describing the
+   * smallest possible subset of fields the producer is required to return on that PCollection. If
+   * there is no proper subset, the result is not included.
+   */
+  Map<ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      getPushdownOpportunities() {
+    return ImmutableMap.copyOf(pushdownOpportunities);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    if (!(transform instanceof ProjectionProducer)) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+    ProjectionProducer<PTransform<?, ?>> pushdownProjector =
+        (ProjectionProducer<PTransform<?, ?>>) transform;
+    if (!pushdownProjector.supportsProjectionPushdown()) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    ImmutableMap.Builder<PCollection<?>, FieldAccessDescriptor> builder = ImmutableMap.builder();

Review comment:
       Now that the map is keyed by transform, we no longer need to return global references here. But it looks like getting local references will require different traversal behavior than we have here, so it's better left to a subsequent visitor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #16176: [BEAM-12976] Pipeline visitor to discover pushdown opportunities.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #16176:
URL: https://github.com/apache/beam/pull/16176#discussion_r766234902



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PushdownProjectorVisitor.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link PipelineVisitor} to discover projection pushdown opportunities. */
+class PushdownProjectorVisitor extends PipelineVisitor.Defaults {
+  private final Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess;
+  private final Map<
+          ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      pushdownOpportunities = new HashMap<>();

Review comment:
       Done.

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PushdownProjectorVisitor.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link PipelineVisitor} to discover projection pushdown opportunities. */
+class PushdownProjectorVisitor extends PipelineVisitor.Defaults {
+  private final Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess;
+  private final Map<
+          ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      pushdownOpportunities = new HashMap<>();
+
+  /**
+   * @param pCollectionFieldAccess A map from PCollection to the fields the pipeline accesses on
+   *     that PCollection. See {@link FieldAccessVisitor}.
+   */
+  PushdownProjectorVisitor(Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess) {
+    this.pCollectionFieldAccess = pCollectionFieldAccess;
+  }
+
+  /**
+   * A map from {@link ProjectionProducer} to an inner map keyed by output PCollections of that
+   * producer. For each PCollection, the value is the {@link FieldAccessDescriptor} describing the
+   * smallest possible subset of fields the producer is required to return on that PCollection. If
+   * there is no proper subset, the result is not included.
+   */
+  Map<ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      getPushdownOpportunities() {
+    return ImmutableMap.copyOf(pushdownOpportunities);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    if (!(transform instanceof ProjectionProducer)) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+    ProjectionProducer<PTransform<?, ?>> pushdownProjector =
+        (ProjectionProducer<PTransform<?, ?>>) transform;
+    if (!pushdownProjector.supportsProjectionPushdown()) {
+      return CompositeBehavior.ENTER_TRANSFORM;

Review comment:
       This it to handle the case of a composite transform where the outer transform doesn't do pushdown but an inner transform does.

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PushdownProjectorVisitor.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link PipelineVisitor} to discover projection pushdown opportunities. */
+class PushdownProjectorVisitor extends PipelineVisitor.Defaults {
+  private final Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess;
+  private final Map<
+          ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      pushdownOpportunities = new HashMap<>();
+
+  /**
+   * @param pCollectionFieldAccess A map from PCollection to the fields the pipeline accesses on
+   *     that PCollection. See {@link FieldAccessVisitor}.
+   */
+  PushdownProjectorVisitor(Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess) {
+    this.pCollectionFieldAccess = pCollectionFieldAccess;
+  }
+
+  /**
+   * A map from {@link ProjectionProducer} to an inner map keyed by output PCollections of that
+   * producer. For each PCollection, the value is the {@link FieldAccessDescriptor} describing the
+   * smallest possible subset of fields the producer is required to return on that PCollection. If
+   * there is no proper subset, the result is not included.
+   */
+  Map<ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      getPushdownOpportunities() {
+    return ImmutableMap.copyOf(pushdownOpportunities);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    if (!(transform instanceof ProjectionProducer)) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+    ProjectionProducer<PTransform<?, ?>> pushdownProjector =
+        (ProjectionProducer<PTransform<?, ?>>) transform;
+    if (!pushdownProjector.supportsProjectionPushdown()) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    ImmutableMap.Builder<PCollection<?>, FieldAccessDescriptor> builder = ImmutableMap.builder();
+    for (PCollection<?> output : node.getOutputs().values()) {
+      FieldAccessDescriptor fieldAccess = pCollectionFieldAccess.get(output);
+      if (fieldAccess != null && !fieldAccess.getAllFields()) {
+        builder.put(output, fieldAccess);
+      }
+    }
+    Map<PCollection<?>, FieldAccessDescriptor> localOpportunities = builder.build();
+    if (localOpportunities.isEmpty()) {
+      return CompositeBehavior.ENTER_TRANSFORM;

Review comment:
       This is to handle the case where an composite transform doesn't have any fields to drop from its input, but maybe one or more of its component transforms does.

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PushdownProjectorVisitor.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link PipelineVisitor} to discover projection pushdown opportunities. */
+class PushdownProjectorVisitor extends PipelineVisitor.Defaults {
+  private final Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess;
+  private final Map<
+          ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      pushdownOpportunities = new HashMap<>();
+
+  /**
+   * @param pCollectionFieldAccess A map from PCollection to the fields the pipeline accesses on
+   *     that PCollection. See {@link FieldAccessVisitor}.
+   */
+  PushdownProjectorVisitor(Map<PCollection<?>, FieldAccessDescriptor> pCollectionFieldAccess) {
+    this.pCollectionFieldAccess = pCollectionFieldAccess;
+  }
+
+  /**
+   * A map from {@link ProjectionProducer} to an inner map keyed by output PCollections of that
+   * producer. For each PCollection, the value is the {@link FieldAccessDescriptor} describing the
+   * smallest possible subset of fields the producer is required to return on that PCollection. If
+   * there is no proper subset, the result is not included.
+   */
+  Map<ProjectionProducer<PTransform<?, ?>>, Map<PCollection<?>, FieldAccessDescriptor>>
+      getPushdownOpportunities() {
+    return ImmutableMap.copyOf(pushdownOpportunities);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+    if (!(transform instanceof ProjectionProducer)) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+    ProjectionProducer<PTransform<?, ?>> pushdownProjector =
+        (ProjectionProducer<PTransform<?, ?>>) transform;
+    if (!pushdownProjector.supportsProjectionPushdown()) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    ImmutableMap.Builder<PCollection<?>, FieldAccessDescriptor> builder = ImmutableMap.builder();
+    for (PCollection<?> output : node.getOutputs().values()) {
+      FieldAccessDescriptor fieldAccess = pCollectionFieldAccess.get(output);
+      if (fieldAccess != null && !fieldAccess.getAllFields()) {
+        builder.put(output, fieldAccess);
+      }
+    }
+    Map<PCollection<?>, FieldAccessDescriptor> localOpportunities = builder.build();
+    if (localOpportunities.isEmpty()) {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    } else {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #16176: [BEAM-12976] Pipeline visitor to discover pushdown opportunities.

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #16176:
URL: https://github.com/apache/beam/pull/16176#issuecomment-991336946


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #16176: [BEAM-12976] Pipeline visitor to discover pushdown opportunities.

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #16176:
URL: https://github.com/apache/beam/pull/16176#issuecomment-993032157


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org