You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/11 02:22:35 UTC

[GitHub] [beam] boyuanzz commented on a change in pull request #14074: Always use portable job submission for Dataflow V2

boyuanzz commented on a change in pull request #14074:
URL: https://github.com/apache/beam/pull/14074#discussion_r592020157



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -927,39 +927,68 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options

Review comment:
       Could you please share more details on this TODO? Like why pipeline option is not preferred?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org