You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/22 17:51:22 UTC

[GitHub] [beam] je-ik opened a new pull request #15370: [DO NOT MERGE] [BEAM-12704] flink primitive read

je-ik opened a new pull request #15370:
URL: https://github.com/apache/beam/pull/15370


   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik merged pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik merged pull request #15370:
URL: https://github.com/apache/beam/pull/15370


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #15370:
URL: https://github.com/apache/beam/pull/15370#issuecomment-904895609


   R: @boyuanzz 
   I'd love to get this to 2.33.0, would you help reviewing? I think the change is safe as it applies to portable Flink using use_deprecated_read, which was not supported anyway.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695924052



##########
File path: website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
##########
@@ -77,11 +77,6 @@
   <td>Remove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146</td>
   <td>Default: <code>false</code></td>
 </tr>
-<tr>
-  <td><code>filesToStage</code></td>

Review comment:
       There is some automated tool that removes that. I reverted the change, but it will simply reappear once the runner is touched.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #15370:
URL: https://github.com/apache/beam/pull/15370#issuecomment-903873932


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695899629



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +596,74 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + unboundedSource, e);
     }
+  }
+
+  /**
+   * Get SDK coder for given PCollection. The SDK coder is the coder that the SDK-harness would have
+   * used to encode data before passing to the runner over {@link SdkHarnessClient}.

Review comment:
       ```suggestion
      * used to encode data before passing them to the runner over {@link SdkHarnessClient}.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
dmvk commented on pull request #15370:
URL: https://github.com/apache/beam/pull/15370#issuecomment-905684816


   As followup, it would be great if we could run validatesRunner test suite for both deprecated & sdf based sources.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695765175



##########
File path: runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourcePortableTest.java
##########
@@ -90,10 +106,16 @@ public void testExecution() throws Exception {
         .as(PortablePipelineOptions.class)
         .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
     Pipeline p = Pipeline.create(options);
-    PCollection<Long> result = p.apply(GenerateSequence.from(0L).to(10L));
+    PCollection<Long> result =
+        p.apply(Read.from(new Source(10)))
+            // FIXME: the test fails without this

Review comment:
       ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695887301



##########
File path: runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourcePortableTest.java
##########
@@ -90,10 +106,16 @@ public void testExecution() throws Exception {
         .as(PortablePipelineOptions.class)
         .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
     Pipeline p = Pipeline.create(options);
-    PCollection<Long> result = p.apply(GenerateSequence.from(0L).to(10L));
+    PCollection<Long> result =
+        p.apply(Read.from(new Source(10)))
+            // FIXME: the test fails without this

Review comment:
       Can you file a JIRA for it then?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695766964



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(
+      String pCollectionId, RunnerApi.Components components) {
+
+    PipelineNode.PCollectionNode pCollectionNode =
+        PipelineNode.pCollection(pCollectionId, components.getPcollectionsOrThrow(pCollectionId));
+    RunnerApi.Components.Builder componentsBuilder = components.toBuilder();
+    String coderId =
+        WireCoders.addSdkWireCoder(
+            pCollectionNode,
+            componentsBuilder,
+            RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(componentsBuilder.build());
+    try {
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> res =
+          (WindowedValue.FullWindowedValueCoder<T>) rehydratedComponents.getCoder(coderId);
+      return res;
+    } catch (IOException ex) {
+      throw new IllegalStateException(ex);
+    }
+  }
+
+  private static <InputT, OutputT> WindowedValue<OutputT> intoWireTypes(
+      Coder<WindowedValue<InputT>> inCoder,
+      Coder<WindowedValue<OutputT>> outCoder,
+      WindowedValue<InputT> value) {
+
+    try {
+      return CoderUtils.decodeFromByteArray(outCoder, CoderUtils.encodeToByteArray(inCoder, value));
+    } catch (CoderException ex) {
+      throw new IllegalStateException(ex);

Review comment:
       nit: missing msg

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(
+      String pCollectionId, RunnerApi.Components components) {
+
+    PipelineNode.PCollectionNode pCollectionNode =
+        PipelineNode.pCollection(pCollectionId, components.getPcollectionsOrThrow(pCollectionId));
+    RunnerApi.Components.Builder componentsBuilder = components.toBuilder();
+    String coderId =
+        WireCoders.addSdkWireCoder(
+            pCollectionNode,
+            componentsBuilder,
+            RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(componentsBuilder.build());
+    try {
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> res =
+          (WindowedValue.FullWindowedValueCoder<T>) rehydratedComponents.getCoder(coderId);
+      return res;
+    } catch (IOException ex) {
+      throw new IllegalStateException(ex);

Review comment:
       nit: missing msg




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695907153



##########
File path: runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourcePortableTest.java
##########
@@ -81,7 +99,8 @@ public static void tearDown() throws InterruptedException {
 
   @Test(timeout = 120_000)
   public void testExecution() throws Exception {

Review comment:
       Are we missing a test case for bounded read? Current validates runner test suites won't cover it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695791725



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
##########
@@ -340,8 +340,7 @@ public OutputT expand(InputT input) {
 
     boolean isJavaSDKCompatible(RunnerApi.Components components, String coderId) {
       RunnerApi.Coder coder = components.getCodersOrThrow(coderId);
-      if (!CoderTranslation.JAVA_SERIALIZED_CODER_URN.equals(coder.getSpec().getUrn())
-          && !CoderTranslation.KNOWN_CODER_URNS.containsValue(coder.getSpec().getUrn())) {
+      if (!CoderTranslation.isKnownCoderURN(coder.getSpec().getUrn())) {

Review comment:
       This was changed when I was trying to get `beam:coders:java:0.1` URN to `CoderTranslation`. It is not needed, reverted.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(

Review comment:
       done

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
##########
@@ -150,7 +151,8 @@ public boolean reachedEnd() throws IOException {
 
   @Override
   public void close() throws IOException {
-    metricContainer.registerMetricsForPipelineResult();
+    Optional.ofNullable(metricContainer)
+        .ifPresent(FlinkMetricContainer::registerMetricsForPipelineResult);

Review comment:
       fixed

##########
File path: runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourcePortableTest.java
##########
@@ -81,7 +96,8 @@ public static void tearDown() throws InterruptedException {
 
   @Test(timeout = 120_000)
   public void testExecution() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.fromArgs("--experiments=beam_fn_api").create();
+    PipelineOptions options =
+        PipelineOptionsFactory.fromArgs("--experiments=use_deprecated_read").create();

Review comment:
       I think that is tested in other tests.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(
+      String pCollectionId, RunnerApi.Components components) {
+
+    PipelineNode.PCollectionNode pCollectionNode =
+        PipelineNode.pCollection(pCollectionId, components.getPcollectionsOrThrow(pCollectionId));
+    RunnerApi.Components.Builder componentsBuilder = components.toBuilder();
+    String coderId =
+        WireCoders.addSdkWireCoder(
+            pCollectionNode,
+            componentsBuilder,
+            RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(componentsBuilder.build());
+    try {
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> res =
+          (WindowedValue.FullWindowedValueCoder<T>) rehydratedComponents.getCoder(coderId);
+      return res;
+    } catch (IOException ex) {
+      throw new IllegalStateException(ex);
+    }
+  }
+
+  private static <InputT, OutputT> WindowedValue<OutputT> intoWireTypes(
+      Coder<WindowedValue<InputT>> inCoder,
+      Coder<WindowedValue<OutputT>> outCoder,
+      WindowedValue<InputT> value) {
+
+    try {
+      return CoderUtils.decodeFromByteArray(outCoder, CoderUtils.encodeToByteArray(inCoder, value));
+    } catch (CoderException ex) {
+      throw new IllegalStateException(ex);

Review comment:
       I don't see any added value of the message, the cause will be visible from the stack trace.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
##########
@@ -86,12 +86,11 @@ public T createInstance() {
   public T copy(T t) {
     if (fasterCopy) {
       return t;
-    } else {
-      try {
-        return CoderUtils.clone(coder, t);
-      } catch (CoderException e) {
-        throw new RuntimeException("Could not clone.", e);
-      }
+    }
+    try {
+      return CoderUtils.clone(coder, t);
+    } catch (Exception e) {
+      throw new RuntimeException("Could not clone value " + t + " using " + coder, e);

Review comment:
       part of debugging, reverted.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(
+      String pCollectionId, RunnerApi.Components components) {
+
+    PipelineNode.PCollectionNode pCollectionNode =
+        PipelineNode.pCollection(pCollectionId, components.getPcollectionsOrThrow(pCollectionId));
+    RunnerApi.Components.Builder componentsBuilder = components.toBuilder();
+    String coderId =
+        WireCoders.addSdkWireCoder(
+            pCollectionNode,
+            componentsBuilder,
+            RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(componentsBuilder.build());
+    try {
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> res =
+          (WindowedValue.FullWindowedValueCoder<T>) rehydratedComponents.getCoder(coderId);
+      return res;
+    } catch (IOException ex) {
+      throw new IllegalStateException(ex);
+    }
+  }
+
+  private static <InputT, OutputT> WindowedValue<OutputT> intoWireTypes(
+      Coder<WindowedValue<InputT>> inCoder,
+      Coder<WindowedValue<OutputT>> outCoder,
+      WindowedValue<InputT> value) {
+
+    try {
+      return CoderUtils.decodeFromByteArray(outCoder, CoderUtils.encodeToByteArray(inCoder, value));

Review comment:
       It definitely is suboptimal, but given the performance penalty of portability itself I seriously doubt this will be addressed separately. This can be optimized when (and if) we get to the stage that we will unify classical and portable runners (so that classical runners will be portable runners that are able to inline complete pipeline, because the pipeline is written in the same SDK as the runner).

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -500,28 +547,41 @@ public void flatMap(T t, Collector<T> collector) {
 
     final DataStream<WindowedValue<T>> source;
     final DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
-    Coder<WindowedValue<T>> windowCoder =
-        instantiateCoder(outputCollectionId, pipeline.getComponents());
 
-    TypeInformation<WindowedValue<T>> outputTypeInfo =
-        new CoderTypeInformation<>(windowCoder, pipelineOptions);
+    @SuppressWarnings("unchecked")
+    UnboundedSource<T, ?> unboundedSource =
+        (UnboundedSource<T, ?>) ReadTranslation.unboundedSourceFromProto(payload);
 
-    WindowingStrategy windowStrategy =
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<T, ?> windowStrategy =
         getWindowingStrategy(outputCollectionId, pipeline.getComponents());
-    TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
-        new CoderTypeInformation<>(
-            WindowedValue.getFullCoder(
-                ValueWithRecordId.ValueWithRecordIdCoder.of(
-                    ((WindowedValueCoder) windowCoder).getValueCoder()),
-                windowStrategy.getWindowFn().windowCoder()),
-            pipelineOptions);
-
-    UnboundedSource unboundedSource = ReadTranslation.unboundedSourceFromProto(payload);
 
     try {
+
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder<T>)
+              (Coder) instantiateCoder(outputCollectionId, pipeline.getComponents());

Review comment:
       fixed

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -474,20 +477,64 @@ public void flatMap(T t, Collector<T> collector) {
       throw new RuntimeException("Failed to parse ReadPayload from transform", e);
     }
 
-    Preconditions.checkState(
-        payload.getIsBounded() != RunnerApi.IsBounded.Enum.BOUNDED,
-        "Bounded reads should run inside an environment instead of being translated by the Runner.");
+    final DataStream<WindowedValue<T>> source;
+    if (payload.getIsBounded() == RunnerApi.IsBounded.Enum.BOUNDED) {
+      source =
+          translateBoundedSource(
+              transform.getUniqueName(),
+              outputCollectionId,
+              payload,
+              pipeline,
+              context.getPipelineOptions(),
+              context.getExecutionEnvironment());
+    } else {
+      source =
+          translateUnboundedSource(
+              transform.getUniqueName(),
+              outputCollectionId,
+              payload,
+              pipeline,
+              context.getPipelineOptions(),
+              context.getExecutionEnvironment());
+    }
+    context.addDataStream(outputCollectionId, source);
+  }
 
-    DataStream<WindowedValue<T>> source =
-        translateUnboundedSource(
-            transform.getUniqueName(),
-            outputCollectionId,
-            payload,
-            pipeline,
-            context.getPipelineOptions(),
-            context.getExecutionEnvironment());
+  private <T> DataStream<WindowedValue<T>> translateBoundedSource(
+      String transformName,
+      String outputCollectionId,
+      RunnerApi.ReadPayload payload,
+      RunnerApi.Pipeline pipeline,
+      FlinkPipelineOptions pipelineOptions,
+      StreamExecutionEnvironment env) {
 
-    context.addDataStream(outputCollectionId, source);
+    try {
+      @SuppressWarnings("unchecked")
+      BoundedSource<T> boundedSource =
+          (BoundedSource<T>) ReadTranslation.boundedSourceFromProto(payload);
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder<T>)

Review comment:
       fixed

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(
+      String pCollectionId, RunnerApi.Components components) {
+
+    PipelineNode.PCollectionNode pCollectionNode =
+        PipelineNode.pCollection(pCollectionId, components.getPcollectionsOrThrow(pCollectionId));
+    RunnerApi.Components.Builder componentsBuilder = components.toBuilder();
+    String coderId =
+        WireCoders.addSdkWireCoder(
+            pCollectionNode,
+            componentsBuilder,
+            RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(componentsBuilder.build());
+    try {
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> res =
+          (WindowedValue.FullWindowedValueCoder<T>) rehydratedComponents.getCoder(coderId);
+      return res;
+    } catch (IOException ex) {
+      throw new IllegalStateException(ex);

Review comment:
       same as below, the message will not bring any more information here.

##########
File path: runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourcePortableTest.java
##########
@@ -90,10 +106,16 @@ public void testExecution() throws Exception {
         .as(PortablePipelineOptions.class)
         .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
     Pipeline p = Pipeline.create(options);
-    PCollection<Long> result = p.apply(GenerateSequence.from(0L).to(10L));
+    PCollection<Long> result =
+        p.apply(Read.from(new Source(10)))
+            // FIXME: the test fails without this

Review comment:
       That is unrelated, there is some bug in TrivialNativeTransformExpander or GreedyPipelineFuser, or somewhere in between.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -500,28 +547,41 @@ public void flatMap(T t, Collector<T> collector) {
 
     final DataStream<WindowedValue<T>> source;
     final DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
-    Coder<WindowedValue<T>> windowCoder =
-        instantiateCoder(outputCollectionId, pipeline.getComponents());
 
-    TypeInformation<WindowedValue<T>> outputTypeInfo =
-        new CoderTypeInformation<>(windowCoder, pipelineOptions);
+    @SuppressWarnings("unchecked")
+    UnboundedSource<T, ?> unboundedSource =
+        (UnboundedSource<T, ?>) ReadTranslation.unboundedSourceFromProto(payload);
 
-    WindowingStrategy windowStrategy =
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<T, ?> windowStrategy =
         getWindowingStrategy(outputCollectionId, pipeline.getComponents());
-    TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
-        new CoderTypeInformation<>(
-            WindowedValue.getFullCoder(
-                ValueWithRecordId.ValueWithRecordIdCoder.of(
-                    ((WindowedValueCoder) windowCoder).getValueCoder()),
-                windowStrategy.getWindowFn().windowCoder()),
-            pipelineOptions);
-
-    UnboundedSource unboundedSource = ReadTranslation.unboundedSourceFromProto(payload);
 
     try {
+
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder<T>)
+              (Coder) instantiateCoder(outputCollectionId, pipeline.getComponents());
+
+      WindowedValue.FullWindowedValueCoder<T> sdkCoder =
+          getSdkCoder(outputCollectionId, pipeline.getComponents());
+
+      CoderTypeInformation<WindowedValue<T>> outputTypeInfo =
+          new CoderTypeInformation<>(wireCoder, pipelineOptions);
+
+      CoderTypeInformation<WindowedValue<T>> sdkTypeInformation =
+          new CoderTypeInformation<>(sdkCoder, pipelineOptions);
+
+      TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
+          new CoderTypeInformation<>(
+              WindowedValue.getFullCoder(
+                  ValueWithRecordId.ValueWithRecordIdCoder.of(sdkCoder.getValueCoder()),
+                  windowStrategy.getWindowFn().windowCoder()),
+              pipelineOptions);
+
       int parallelism =

Review comment:
       that is not related to this change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #15370: [DO NOT MERGE] [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #15370:
URL: https://github.com/apache/beam/pull/15370#issuecomment-903531388


   Run XVR_Flink PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695904747



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
##########
@@ -150,7 +151,8 @@ public boolean reachedEnd() throws IOException {
 
   @Override
   public void close() throws IOException {
-    metricContainer.registerMetricsForPipelineResult();
+    Optional.ofNullable(metricContainer)
+        .ifPresent(FlinkMetricContainer::registerMetricsForPipelineResult);
     // TODO null check can be removed once FLINK-3796 is fixed

Review comment:
       OK, we can take look into this in separate issue. This shouldn't make any difference over the current state.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695709072



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
##########
@@ -86,12 +86,11 @@ public T createInstance() {
   public T copy(T t) {
     if (fasterCopy) {
       return t;
-    } else {
-      try {
-        return CoderUtils.clone(coder, t);
-      } catch (CoderException e) {
-        throw new RuntimeException("Could not clone.", e);
-      }
+    }
+    try {
+      return CoderUtils.clone(coder, t);
+    } catch (Exception e) {
+      throw new RuntimeException("Could not clone value " + t + " using " + coder, e);

Review comment:
       Unrelated change.
   
   Can this result in a massive log message?

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(

Review comment:
       It would be good to add javadoc for this and `intoWireTypes`. Something along the lines of "this simulates wired protocol of SDK harness (we basically need to get its response format)".

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
##########
@@ -340,8 +340,7 @@ public OutputT expand(InputT input) {
 
     boolean isJavaSDKCompatible(RunnerApi.Components components, String coderId) {
       RunnerApi.Coder coder = components.getCodersOrThrow(coderId);
-      if (!CoderTranslation.JAVA_SERIALIZED_CODER_URN.equals(coder.getSpec().getUrn())
-          && !CoderTranslation.KNOWN_CODER_URNS.containsValue(coder.getSpec().getUrn())) {
+      if (!CoderTranslation.isKnownCoderURN(coder.getSpec().getUrn())) {

Review comment:
       I don't think this is correct `isKnownCoderURN` method doesn't say anything about java compatibility 
   
   I'd prefer removing this change

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -500,28 +547,41 @@ public void flatMap(T t, Collector<T> collector) {
 
     final DataStream<WindowedValue<T>> source;
     final DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
-    Coder<WindowedValue<T>> windowCoder =
-        instantiateCoder(outputCollectionId, pipeline.getComponents());
 
-    TypeInformation<WindowedValue<T>> outputTypeInfo =
-        new CoderTypeInformation<>(windowCoder, pipelineOptions);
+    @SuppressWarnings("unchecked")
+    UnboundedSource<T, ?> unboundedSource =
+        (UnboundedSource<T, ?>) ReadTranslation.unboundedSourceFromProto(payload);
 
-    WindowingStrategy windowStrategy =
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<T, ?> windowStrategy =
         getWindowingStrategy(outputCollectionId, pipeline.getComponents());
-    TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
-        new CoderTypeInformation<>(
-            WindowedValue.getFullCoder(
-                ValueWithRecordId.ValueWithRecordIdCoder.of(
-                    ((WindowedValueCoder) windowCoder).getValueCoder()),
-                windowStrategy.getWindowFn().windowCoder()),
-            pipelineOptions);
-
-    UnboundedSource unboundedSource = ReadTranslation.unboundedSourceFromProto(payload);
 
     try {
+
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder<T>)
+              (Coder) instantiateCoder(outputCollectionId, pipeline.getComponents());
+
+      WindowedValue.FullWindowedValueCoder<T> sdkCoder =
+          getSdkCoder(outputCollectionId, pipeline.getComponents());
+
+      CoderTypeInformation<WindowedValue<T>> outputTypeInfo =
+          new CoderTypeInformation<>(wireCoder, pipelineOptions);
+
+      CoderTypeInformation<WindowedValue<T>> sdkTypeInformation =
+          new CoderTypeInformation<>(sdkCoder, pipelineOptions);
+
+      TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
+          new CoderTypeInformation<>(
+              WindowedValue.getFullCoder(
+                  ValueWithRecordId.ValueWithRecordIdCoder.of(sdkCoder.getValueCoder()),
+                  windowStrategy.getWindowFn().windowCoder()),
+              pipelineOptions);
+
       int parallelism =

Review comment:
       parallelism calculation seems incorrect

##########
File path: website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
##########
@@ -77,11 +77,6 @@
   <td>Remove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146</td>
   <td>Default: <code>false</code></td>
 </tr>
-<tr>
-  <td><code>filesToStage</code></td>

Review comment:
       Why is this removed?

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
##########
@@ -150,7 +151,8 @@ public boolean reachedEnd() throws IOException {
 
   @Override
   public void close() throws IOException {
-    metricContainer.registerMetricsForPipelineResult();
+    Optional.ofNullable(metricContainer)
+        .ifPresent(FlinkMetricContainer::registerMetricsForPipelineResult);
     // TODO null check can be removed once FLINK-3796 is fixed

Review comment:
       FLINK-3796 is already fixed, it should be no longer necessary to check whether `metricContainer` and `reader` are null.
   
   

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -500,28 +547,41 @@ public void flatMap(T t, Collector<T> collector) {
 
     final DataStream<WindowedValue<T>> source;
     final DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
-    Coder<WindowedValue<T>> windowCoder =
-        instantiateCoder(outputCollectionId, pipeline.getComponents());
 
-    TypeInformation<WindowedValue<T>> outputTypeInfo =
-        new CoderTypeInformation<>(windowCoder, pipelineOptions);
+    @SuppressWarnings("unchecked")
+    UnboundedSource<T, ?> unboundedSource =
+        (UnboundedSource<T, ?>) ReadTranslation.unboundedSourceFromProto(payload);
 
-    WindowingStrategy windowStrategy =
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<T, ?> windowStrategy =
         getWindowingStrategy(outputCollectionId, pipeline.getComponents());
-    TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
-        new CoderTypeInformation<>(
-            WindowedValue.getFullCoder(
-                ValueWithRecordId.ValueWithRecordIdCoder.of(
-                    ((WindowedValueCoder) windowCoder).getValueCoder()),
-                windowStrategy.getWindowFn().windowCoder()),
-            pipelineOptions);
-
-    UnboundedSource unboundedSource = ReadTranslation.unboundedSourceFromProto(payload);
 
     try {
+
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder<T>)
+              (Coder) instantiateCoder(outputCollectionId, pipeline.getComponents());
+
+      WindowedValue.FullWindowedValueCoder<T> sdkCoder =
+          getSdkCoder(outputCollectionId, pipeline.getComponents());
+
+      CoderTypeInformation<WindowedValue<T>> outputTypeInfo =
+          new CoderTypeInformation<>(wireCoder, pipelineOptions);
+
+      CoderTypeInformation<WindowedValue<T>> sdkTypeInformation =
+          new CoderTypeInformation<>(sdkCoder, pipelineOptions);
+
+      TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
+          new CoderTypeInformation<>(
+              WindowedValue.getFullCoder(
+                  ValueWithRecordId.ValueWithRecordIdCoder.of(sdkCoder.getValueCoder()),
+                  windowStrategy.getWindowFn().windowCoder()),
+              pipelineOptions);
+
       int parallelism =

Review comment:
       (the variable name seems wrong)

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
##########
@@ -150,7 +151,8 @@ public boolean reachedEnd() throws IOException {
 
   @Override
   public void close() throws IOException {
-    metricContainer.registerMetricsForPipelineResult();
+    Optional.ofNullable(metricContainer)
+        .ifPresent(FlinkMetricContainer::registerMetricsForPipelineResult);

Review comment:
       I know that you personally find `Optional.ofNullable` more readable, but this shouldn't come at cost of inconsistent code style. (the reader check right bellow uses a classic if-statement nullness check, please stick to that)

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -500,28 +547,41 @@ public void flatMap(T t, Collector<T> collector) {
 
     final DataStream<WindowedValue<T>> source;
     final DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
-    Coder<WindowedValue<T>> windowCoder =
-        instantiateCoder(outputCollectionId, pipeline.getComponents());
 
-    TypeInformation<WindowedValue<T>> outputTypeInfo =
-        new CoderTypeInformation<>(windowCoder, pipelineOptions);
+    @SuppressWarnings("unchecked")
+    UnboundedSource<T, ?> unboundedSource =
+        (UnboundedSource<T, ?>) ReadTranslation.unboundedSourceFromProto(payload);
 
-    WindowingStrategy windowStrategy =
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<T, ?> windowStrategy =
         getWindowingStrategy(outputCollectionId, pipeline.getComponents());
-    TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
-        new CoderTypeInformation<>(
-            WindowedValue.getFullCoder(
-                ValueWithRecordId.ValueWithRecordIdCoder.of(
-                    ((WindowedValueCoder) windowCoder).getValueCoder()),
-                windowStrategy.getWindowFn().windowCoder()),
-            pipelineOptions);
-
-    UnboundedSource unboundedSource = ReadTranslation.unboundedSourceFromProto(payload);
 
     try {
+
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder<T>)
+              (Coder) instantiateCoder(outputCollectionId, pipeline.getComponents());

Review comment:
       nit: double cast

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -474,20 +477,64 @@ public void flatMap(T t, Collector<T> collector) {
       throw new RuntimeException("Failed to parse ReadPayload from transform", e);
     }
 
-    Preconditions.checkState(
-        payload.getIsBounded() != RunnerApi.IsBounded.Enum.BOUNDED,
-        "Bounded reads should run inside an environment instead of being translated by the Runner.");
+    final DataStream<WindowedValue<T>> source;
+    if (payload.getIsBounded() == RunnerApi.IsBounded.Enum.BOUNDED) {
+      source =
+          translateBoundedSource(
+              transform.getUniqueName(),
+              outputCollectionId,
+              payload,
+              pipeline,
+              context.getPipelineOptions(),
+              context.getExecutionEnvironment());
+    } else {
+      source =
+          translateUnboundedSource(
+              transform.getUniqueName(),
+              outputCollectionId,
+              payload,
+              pipeline,
+              context.getPipelineOptions(),
+              context.getExecutionEnvironment());
+    }
+    context.addDataStream(outputCollectionId, source);
+  }
 
-    DataStream<WindowedValue<T>> source =
-        translateUnboundedSource(
-            transform.getUniqueName(),
-            outputCollectionId,
-            payload,
-            pipeline,
-            context.getPipelineOptions(),
-            context.getExecutionEnvironment());
+  private <T> DataStream<WindowedValue<T>> translateBoundedSource(
+      String transformName,
+      String outputCollectionId,
+      RunnerApi.ReadPayload payload,
+      RunnerApi.Pipeline pipeline,
+      FlinkPipelineOptions pipelineOptions,
+      StreamExecutionEnvironment env) {
 
-    context.addDataStream(outputCollectionId, source);
+    try {
+      @SuppressWarnings("unchecked")
+      BoundedSource<T> boundedSource =
+          (BoundedSource<T>) ReadTranslation.boundedSourceFromProto(payload);
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder<T>)

Review comment:
       nit: double cast

##########
File path: runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourcePortableTest.java
##########
@@ -81,7 +96,8 @@ public static void tearDown() throws InterruptedException {
 
   @Test(timeout = 120_000)
   public void testExecution() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.fromArgs("--experiments=beam_fn_api").create();
+    PipelineOptions options =
+        PipelineOptionsFactory.fromArgs("--experiments=use_deprecated_read").create();

Review comment:
       should we also test a scenario without the flag?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #15370:
URL: https://github.com/apache/beam/pull/15370#issuecomment-903735485


   Run XVR_Flink PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695766582



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(
+      String pCollectionId, RunnerApi.Components components) {
+
+    PipelineNode.PCollectionNode pCollectionNode =
+        PipelineNode.pCollection(pCollectionId, components.getPcollectionsOrThrow(pCollectionId));
+    RunnerApi.Components.Builder componentsBuilder = components.toBuilder();
+    String coderId =
+        WireCoders.addSdkWireCoder(
+            pCollectionNode,
+            componentsBuilder,
+            RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(componentsBuilder.build());
+    try {
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> res =
+          (WindowedValue.FullWindowedValueCoder<T>) rehydratedComponents.getCoder(coderId);
+      return res;
+    } catch (IOException ex) {
+      throw new IllegalStateException(ex);
+    }
+  }
+
+  private static <InputT, OutputT> WindowedValue<OutputT> intoWireTypes(
+      Coder<WindowedValue<InputT>> inCoder,
+      Coder<WindowedValue<OutputT>> outCoder,
+      WindowedValue<InputT> value) {
+
+    try {
+      return CoderUtils.decodeFromByteArray(outCoder, CoderUtils.encodeToByteArray(inCoder, value));

Review comment:
       This seems sub-optimal. Is it possible to optimize it? (if it's not easy, it would be nice to have a JIRA to track the issue)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #15370: [DO NOT MERGE] [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #15370:
URL: https://github.com/apache/beam/pull/15370#issuecomment-903531249


   Run Java Flink PortableValidatesRunner Streaming


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695897495



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -474,20 +479,64 @@ public void flatMap(T t, Collector<T> collector) {
       throw new RuntimeException("Failed to parse ReadPayload from transform", e);
     }
 
-    Preconditions.checkState(
-        payload.getIsBounded() != RunnerApi.IsBounded.Enum.BOUNDED,
-        "Bounded reads should run inside an environment instead of being translated by the Runner.");
+    final DataStream<WindowedValue<T>> source;
+    if (payload.getIsBounded() == RunnerApi.IsBounded.Enum.BOUNDED) {
+      source =
+          translateBoundedSource(
+              transform.getUniqueName(),
+              outputCollectionId,
+              payload,
+              pipeline,
+              context.getPipelineOptions(),
+              context.getExecutionEnvironment());
+    } else {
+      source =
+          translateUnboundedSource(
+              transform.getUniqueName(),
+              outputCollectionId,
+              payload,
+              pipeline,
+              context.getPipelineOptions(),
+              context.getExecutionEnvironment());
+    }
+    context.addDataStream(outputCollectionId, source);
+  }
 
-    DataStream<WindowedValue<T>> source =
-        translateUnboundedSource(
-            transform.getUniqueName(),
-            outputCollectionId,
-            payload,
-            pipeline,
-            context.getPipelineOptions(),
-            context.getExecutionEnvironment());
+  private <T> DataStream<WindowedValue<T>> translateBoundedSource(
+      String transformName,
+      String outputCollectionId,
+      RunnerApi.ReadPayload payload,
+      RunnerApi.Pipeline pipeline,
+      FlinkPipelineOptions pipelineOptions,
+      StreamExecutionEnvironment env) {
 
-    context.addDataStream(outputCollectionId, source);
+    try {
+      @SuppressWarnings("unchecked")
+      BoundedSource<T> boundedSource =
+          (BoundedSource<T>) ReadTranslation.boundedSourceFromProto(payload);
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder)
+              instantiateCoder(outputCollectionId, pipeline.getComponents());
+
+      WindowedValue.FullWindowedValueCoder<T> sdkCoder =
+          getSdkCoder(outputCollectionId, pipeline.getComponents());
+
+      CoderTypeInformation<WindowedValue<T>> outputTypeInfo =
+          new CoderTypeInformation<>(wireCoder, pipelineOptions);
+
+      CoderTypeInformation<WindowedValue<T>> sdkTypeInfo =
+          new CoderTypeInformation<>(sdkCoder, pipelineOptions);
+
+      return env.createInput(new SourceInputFormat<>(transformName, boundedSource, pipelineOptions))
+          .name(transformName)
+          .uid(transformName)
+          .returns(sdkTypeInfo)
+          .map(value -> intoWireTypes(sdkCoder, wireCoder, value))

Review comment:
       It would be nice to give this transform a name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #15370:
URL: https://github.com/apache/beam/pull/15370#issuecomment-903886762


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695895434



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(
+      String pCollectionId, RunnerApi.Components components) {
+
+    PipelineNode.PCollectionNode pCollectionNode =
+        PipelineNode.pCollection(pCollectionId, components.getPcollectionsOrThrow(pCollectionId));
+    RunnerApi.Components.Builder componentsBuilder = components.toBuilder();
+    String coderId =
+        WireCoders.addSdkWireCoder(
+            pCollectionNode,
+            componentsBuilder,
+            RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(componentsBuilder.build());
+    try {
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> res =
+          (WindowedValue.FullWindowedValueCoder<T>) rehydratedComponents.getCoder(coderId);
+      return res;
+    } catch (IOException ex) {
+      throw new IllegalStateException(ex);
+    }
+  }
+
+  private static <InputT, OutputT> WindowedValue<OutputT> intoWireTypes(
+      Coder<WindowedValue<InputT>> inCoder,
+      Coder<WindowedValue<OutputT>> outCoder,
+      WindowedValue<InputT> value) {
+
+    try {
+      return CoderUtils.decodeFromByteArray(outCoder, CoderUtils.encodeToByteArray(inCoder, value));
+    } catch (CoderException ex) {
+      throw new IllegalStateException(ex);

Review comment:
       Same as above. It makes a huge difference if you say "I wasn't able to translate element from format X to format Y" then for example "buffer overflow".
   
   Which one would you find more useful?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #15370:
URL: https://github.com/apache/beam/pull/15370#issuecomment-903735388


   Run Java Flink PortableValidatesRunner Streaming


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #15370: [DO NOT MERGE] [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #15370:
URL: https://github.com/apache/beam/pull/15370#issuecomment-903329711


   Run XVR_Flink PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695886857



##########
File path: website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
##########
@@ -77,11 +77,6 @@
   <td>Remove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146</td>
   <td>Default: <code>false</code></td>
 </tr>
-<tr>
-  <td><code>filesToStage</code></td>

Review comment:
       We should either explain this, or remove this change. (if this is needed, master build should be failing???)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695897495



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -474,20 +479,64 @@ public void flatMap(T t, Collector<T> collector) {
       throw new RuntimeException("Failed to parse ReadPayload from transform", e);
     }
 
-    Preconditions.checkState(
-        payload.getIsBounded() != RunnerApi.IsBounded.Enum.BOUNDED,
-        "Bounded reads should run inside an environment instead of being translated by the Runner.");
+    final DataStream<WindowedValue<T>> source;
+    if (payload.getIsBounded() == RunnerApi.IsBounded.Enum.BOUNDED) {
+      source =
+          translateBoundedSource(
+              transform.getUniqueName(),
+              outputCollectionId,
+              payload,
+              pipeline,
+              context.getPipelineOptions(),
+              context.getExecutionEnvironment());
+    } else {
+      source =
+          translateUnboundedSource(
+              transform.getUniqueName(),
+              outputCollectionId,
+              payload,
+              pipeline,
+              context.getPipelineOptions(),
+              context.getExecutionEnvironment());
+    }
+    context.addDataStream(outputCollectionId, source);
+  }
 
-    DataStream<WindowedValue<T>> source =
-        translateUnboundedSource(
-            transform.getUniqueName(),
-            outputCollectionId,
-            payload,
-            pipeline,
-            context.getPipelineOptions(),
-            context.getExecutionEnvironment());
+  private <T> DataStream<WindowedValue<T>> translateBoundedSource(
+      String transformName,
+      String outputCollectionId,
+      RunnerApi.ReadPayload payload,
+      RunnerApi.Pipeline pipeline,
+      FlinkPipelineOptions pipelineOptions,
+      StreamExecutionEnvironment env) {
 
-    context.addDataStream(outputCollectionId, source);
+    try {
+      @SuppressWarnings("unchecked")
+      BoundedSource<T> boundedSource =
+          (BoundedSource<T>) ReadTranslation.boundedSourceFromProto(payload);
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder)
+              instantiateCoder(outputCollectionId, pipeline.getComponents());
+
+      WindowedValue.FullWindowedValueCoder<T> sdkCoder =
+          getSdkCoder(outputCollectionId, pipeline.getComponents());
+
+      CoderTypeInformation<WindowedValue<T>> outputTypeInfo =
+          new CoderTypeInformation<>(wireCoder, pipelineOptions);
+
+      CoderTypeInformation<WindowedValue<T>> sdkTypeInfo =
+          new CoderTypeInformation<>(sdkCoder, pipelineOptions);
+
+      return env.createInput(new SourceInputFormat<>(transformName, boundedSource, pipelineOptions))
+          .name(transformName)
+          .uid(transformName)
+          .returns(sdkTypeInfo)
+          .map(value -> intoWireTypes(sdkCoder, wireCoder, value))

Review comment:
       It would be nice to give this transform a name (and uid).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #15370:
URL: https://github.com/apache/beam/pull/15370#issuecomment-903796990


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695893901



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(
+      String pCollectionId, RunnerApi.Components components) {
+
+    PipelineNode.PCollectionNode pCollectionNode =
+        PipelineNode.pCollection(pCollectionId, components.getPcollectionsOrThrow(pCollectionId));
+    RunnerApi.Components.Builder componentsBuilder = components.toBuilder();
+    String coderId =
+        WireCoders.addSdkWireCoder(
+            pCollectionNode,
+            componentsBuilder,
+            RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(componentsBuilder.build());
+    try {
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> res =
+          (WindowedValue.FullWindowedValueCoder<T>) rehydratedComponents.getCoder(coderId);
+      return res;
+    } catch (IOException ex) {
+      throw new IllegalStateException(ex);

Review comment:
       That's not true. This can be any exception that has been thrown deep down the stack.
   
   Think about this as a scoping / providing more context, you basically say this exception (I don't care which exception) affected "coder retrieval".
   
   Typical users won't search the source code to get more context about where the exception has been thrown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695796605



##########
File path: website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
##########
@@ -77,11 +77,6 @@
   <td>Remove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146</td>
   <td>Default: <code>false</code></td>
 </tr>
-<tr>
-  <td><code>filesToStage</code></td>

Review comment:
       Don't know. That does some automated tool (maybe part of spotless).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #15370: [DO NOT MERGE] [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #15370:
URL: https://github.com/apache/beam/pull/15370#issuecomment-903327951


   Run Java Flink PortableValidatesRunner Streaming


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #15370:
URL: https://github.com/apache/beam/pull/15370#issuecomment-904022592


   @lukecwik cool, the pre-commit finally passed, would you have time to review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #15370: [BEAM-12704] flink primitive read

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695797039



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
##########
@@ -150,7 +151,8 @@ public boolean reachedEnd() throws IOException {
 
   @Override
   public void close() throws IOException {
-    metricContainer.registerMetricsForPipelineResult();
+    Optional.ofNullable(metricContainer)
+        .ifPresent(FlinkMetricContainer::registerMetricsForPipelineResult);
     // TODO null check can be removed once FLINK-3796 is fixed

Review comment:
       Yes, I thought, but unfortunately this still appears to be null sometimes. I didn't check why.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org