You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/25 17:06:54 UTC

[GitHub] [beam] kennknowles opened a new pull request #14339: Roll forward PJS with Pubsub fix

kennknowles opened a new pull request #14339:
URL: https://github.com/apache/beam/pull/14339


   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>SDK</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon">
           </a>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img src="https://camo.githubusercontent.com/4565d7b7e907114e6c1d12323408bd903aa252fefed5eeab93701b05c9628a84/68747470733a2f2f63692d6265616d2e6170616368652e6f72672f6a6f622f6265616d5f507265436f6d6d69745f507974686f6e446f636b65725f43726f6e2f62616467652f69636f6e" alt="Build Status" data-canonical-src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img src="https://camo.githubusercontent.com/21afb097a5745440598bee5c59a027b140585eec871c3f3b883200610fabf722/68747470733a2f2f63692d6265616d2e6170616368652e6f72672f6a6f622f6265616d5f507265436f6d6d69745f507974686f6e446f63735f43726f6e2f62616467652f69636f6e" alt="Build Status" data-canonical-src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #14339:
URL: https://github.com/apache/beam/pull/14339#discussion_r603594664



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
+    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
+    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();
 
-    List<DataflowPackage> packages = stageArtifacts(pipelineProto);
+    DataflowPackage stagedPipeline =
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
+    // Now rewrite things to be as needed for v1 (mutates the pipeline)
+    replaceTransforms(pipeline);
+    // Capture the SdkComponents for look up during step translations
+    SdkComponents dataflowV1Components = SdkComponents.create();
+    dataflowV1Components.registerEnvironment(
+        defaultEnvironmentForDataflow
+            .toBuilder()
+            .addAllDependencies(getDefaultArtifacts())
+            .addAllCapabilities(Environments.getJavaCapabilities())
+            .build());
+    RunnerApi.Pipeline dataflowV1PipelineProto =
+        PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
+    LOG.info("Dataflow v1 pipeline proto:\n{}", TextFormat.printToString(dataflowV1PipelineProto));
+    List<DataflowPackage> packages = stageArtifacts(dataflowV1PipelineProto);

Review comment:
       OK I reviewed my comment. The problem is runner v2 without portable job submission. Actually the variable is named wrong. It is the dataflowV1beta3PipelineProto, which is used for runner v2 when there is not portable job submission.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles merged pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles merged pull request #14339:
URL: https://github.com/apache/beam/pull/14339


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #14339:
URL: https://github.com/apache/beam/pull/14339#issuecomment-807109742


   Run Java Examples on Dataflow Runner V2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #14339:
URL: https://github.com/apache/beam/pull/14339#discussion_r603662583



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
+    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
+    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();
 
-    List<DataflowPackage> packages = stageArtifacts(pipelineProto);
+    DataflowPackage stagedPipeline =
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());

Review comment:
       The motivation for me to do so is that it will be easier to do cleanup later(like keep everything in the `if` block and remove others in `else`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #14339:
URL: https://github.com/apache/beam/pull/14339#discussion_r603661340



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
+    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
+    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();
 
-    List<DataflowPackage> packages = stageArtifacts(pipelineProto);
+    DataflowPackage stagedPipeline =
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
+    // Now rewrite things to be as needed for v1 (mutates the pipeline)
+    replaceTransforms(pipeline);
+    // Capture the SdkComponents for look up during step translations
+    SdkComponents dataflowV1Components = SdkComponents.create();
+    dataflowV1Components.registerEnvironment(
+        defaultEnvironmentForDataflow
+            .toBuilder()
+            .addAllDependencies(getDefaultArtifacts())
+            .addAllCapabilities(Environments.getJavaCapabilities())
+            .build());
+    RunnerApi.Pipeline dataflowV1PipelineProto =
+        PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
+    LOG.info("Dataflow v1 pipeline proto:\n{}", TextFormat.printToString(dataflowV1PipelineProto));
+    List<DataflowPackage> packages = stageArtifacts(dataflowV1PipelineProto);

Review comment:
       IIUC, the `dataflowV1PipelineProto` is the portable pipeline proto right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #14339:
URL: https://github.com/apache/beam/pull/14339#issuecomment-807229946


   run java postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #14339:
URL: https://github.com/apache/beam/pull/14339#issuecomment-807580050


   Sorry for the many PRs, most of which got closed. This one I think is the one to move forward with. All the postcommit failures are known failures in https://ci-beam.apache.org/job/beam_PostCommit_Java_PR/647/ on `master`.
   
   I will do some internal testing as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #14339:
URL: https://github.com/apache/beam/pull/14339#discussion_r603591147



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));

Review comment:
       It is part of faee65e220dcca094a3c7ef8e398430545374470 "DO NOT SUBMIT: Logging" which I will drop

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
+    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
+    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();
 
-    List<DataflowPackage> packages = stageArtifacts(pipelineProto);
+    DataflowPackage stagedPipeline =
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
+    // Now rewrite things to be as needed for v1 (mutates the pipeline)
+    replaceTransforms(pipeline);
+    // Capture the SdkComponents for look up during step translations
+    SdkComponents dataflowV1Components = SdkComponents.create();
+    dataflowV1Components.registerEnvironment(
+        defaultEnvironmentForDataflow
+            .toBuilder()
+            .addAllDependencies(getDefaultArtifacts())
+            .addAllCapabilities(Environments.getJavaCapabilities())
+            .build());
+    RunnerApi.Pipeline dataflowV1PipelineProto =
+        PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
+    LOG.info("Dataflow v1 pipeline proto:\n{}", TextFormat.printToString(dataflowV1PipelineProto));

Review comment:
       It is part of faee65e220dcca094a3c7ef8e398430545374470 "DO NOT SUBMIT: Logging" which I will drop

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
+    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
+    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();
 
-    List<DataflowPackage> packages = stageArtifacts(pipelineProto);
+    DataflowPackage stagedPipeline =
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());

Review comment:
       I could do that if it adds something. Eventually the goal is to eliminate all conditionals of "useUnifiedWorker" so that a valid v1beta3 and a valid UW submission are created no matter what.

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
+    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
+    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();
 
-    List<DataflowPackage> packages = stageArtifacts(pipelineProto);
+    DataflowPackage stagedPipeline =
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
+    // Now rewrite things to be as needed for v1 (mutates the pipeline)
+    replaceTransforms(pipeline);
+    // Capture the SdkComponents for look up during step translations
+    SdkComponents dataflowV1Components = SdkComponents.create();
+    dataflowV1Components.registerEnvironment(
+        defaultEnvironmentForDataflow
+            .toBuilder()
+            .addAllDependencies(getDefaultArtifacts())
+            .addAllCapabilities(Environments.getJavaCapabilities())
+            .build());
+    RunnerApi.Pipeline dataflowV1PipelineProto =
+        PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
+    LOG.info("Dataflow v1 pipeline proto:\n{}", TextFormat.printToString(dataflowV1PipelineProto));
+    List<DataflowPackage> packages = stageArtifacts(dataflowV1PipelineProto);

Review comment:
       This is why the environment was missing in the failing jobs earlier IIRC. I've now forgotten. I will diff against the failing PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #14339:
URL: https://github.com/apache/beam/pull/14339#discussion_r603615692



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");

Review comment:
       Yes. The third commit does this. So we can get some signal right away on this PR. I can drop the last commit too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #14339:
URL: https://github.com/apache/beam/pull/14339#issuecomment-809717098






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #14339:
URL: https://github.com/apache/beam/pull/14339#issuecomment-809716665


   run java postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #14339:
URL: https://github.com/apache/beam/pull/14339#discussion_r603664340



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
+    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
+    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();
 
-    List<DataflowPackage> packages = stageArtifacts(pipelineProto);
+    DataflowPackage stagedPipeline =
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
+    // Now rewrite things to be as needed for v1 (mutates the pipeline)
+    replaceTransforms(pipeline);
+    // Capture the SdkComponents for look up during step translations
+    SdkComponents dataflowV1Components = SdkComponents.create();
+    dataflowV1Components.registerEnvironment(
+        defaultEnvironmentForDataflow
+            .toBuilder()
+            .addAllDependencies(getDefaultArtifacts())
+            .addAllCapabilities(Environments.getJavaCapabilities())
+            .build());
+    RunnerApi.Pipeline dataflowV1PipelineProto =
+        PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
+    LOG.info("Dataflow v1 pipeline proto:\n{}", TextFormat.printToString(dataflowV1PipelineProto));
+    List<DataflowPackage> packages = stageArtifacts(dataflowV1PipelineProto);

Review comment:
       `portablePipelineProto` is the pipeline after the runner v2 Pubsub overrides
   
   `dataflowV1PipelineProto` is the pipeline with all the other v1 and v2 overrides




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #14339:
URL: https://github.com/apache/beam/pull/14339#issuecomment-809758603


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #14339:
URL: https://github.com/apache/beam/pull/14339#issuecomment-809795088


   OK before I edit I want to note the greenness :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #14339:
URL: https://github.com/apache/beam/pull/14339#issuecomment-809726434


   run java postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #14339:
URL: https://github.com/apache/beam/pull/14339#discussion_r603665586



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
+    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
+    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();
 
-    List<DataflowPackage> packages = stageArtifacts(pipelineProto);
+    DataflowPackage stagedPipeline =
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());

Review comment:
       I am happy to do it now to make progress. I think the end goal will be to have both paths. Always do portable job submission steps _and_ the non-portable job submission.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #14339:
URL: https://github.com/apache/beam/pull/14339#discussion_r603487223



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1150,6 +1208,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
 
     Job jobResult;
     try {
+      LOG.info("v1beta3 job: {}", newJob.toPrettyString());

Review comment:
       It will be extremely long for most cases. Do we want this shown on the user console log?

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");

Review comment:
       It seems like you decide to populate the `use_portable_job_submission` from SDK side, right?

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));

Review comment:
       Maybe change it back to `debug`?

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
+    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
+    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();
 
-    List<DataflowPackage> packages = stageArtifacts(pipelineProto);
+    DataflowPackage stagedPipeline =
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
+    // Now rewrite things to be as needed for v1 (mutates the pipeline)
+    replaceTransforms(pipeline);
+    // Capture the SdkComponents for look up during step translations
+    SdkComponents dataflowV1Components = SdkComponents.create();
+    dataflowV1Components.registerEnvironment(
+        defaultEnvironmentForDataflow
+            .toBuilder()
+            .addAllDependencies(getDefaultArtifacts())
+            .addAllCapabilities(Environments.getJavaCapabilities())
+            .build());
+    RunnerApi.Pipeline dataflowV1PipelineProto =
+        PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
+    LOG.info("Dataflow v1 pipeline proto:\n{}", TextFormat.printToString(dataflowV1PipelineProto));
+    List<DataflowPackage> packages = stageArtifacts(dataflowV1PipelineProto);

Review comment:
       I thought we will not stage the portable pipeline proto for runner v1. If it's not easy to clean these up at this moment, a `TODO` might be worthy here.

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
+    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
+    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();
 
-    List<DataflowPackage> packages = stageArtifacts(pipelineProto);
+    DataflowPackage stagedPipeline =
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
+    // Now rewrite things to be as needed for v1 (mutates the pipeline)
+    replaceTransforms(pipeline);
+    // Capture the SdkComponents for look up during step translations
+    SdkComponents dataflowV1Components = SdkComponents.create();
+    dataflowV1Components.registerEnvironment(
+        defaultEnvironmentForDataflow
+            .toBuilder()
+            .addAllDependencies(getDefaultArtifacts())
+            .addAllCapabilities(Environments.getJavaCapabilities())
+            .build());
+    RunnerApi.Pipeline dataflowV1PipelineProto =
+        PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
+    LOG.info("Dataflow v1 pipeline proto:\n{}", TextFormat.printToString(dataflowV1PipelineProto));

Review comment:
       `debug`?

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
+    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
+    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();
 
-    List<DataflowPackage> packages = stageArtifacts(pipelineProto);
+    DataflowPackage stagedPipeline =
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());

Review comment:
       How about having this block inside `if (useUnifiedWorker(options))`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #14339: Roll forward PJS with Pubsub fix

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #14339:
URL: https://github.com/apache/beam/pull/14339#issuecomment-809726488






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org