You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/29 20:31:03 UTC

[GitHub] [beam] kennknowles commented on a change in pull request #14339: Roll forward PJS with Pubsub fix

kennknowles commented on a change in pull request #14339:
URL: https://github.com/apache/beam/pull/14339#discussion_r603591147



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));

Review comment:
       It is part of faee65e220dcca094a3c7ef8e398430545374470 "DO NOT SUBMIT: Logging" which I will drop

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
+    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
+    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();
 
-    List<DataflowPackage> packages = stageArtifacts(pipelineProto);
+    DataflowPackage stagedPipeline =
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
+    // Now rewrite things to be as needed for v1 (mutates the pipeline)
+    replaceTransforms(pipeline);
+    // Capture the SdkComponents for look up during step translations
+    SdkComponents dataflowV1Components = SdkComponents.create();
+    dataflowV1Components.registerEnvironment(
+        defaultEnvironmentForDataflow
+            .toBuilder()
+            .addAllDependencies(getDefaultArtifacts())
+            .addAllCapabilities(Environments.getJavaCapabilities())
+            .build());
+    RunnerApi.Pipeline dataflowV1PipelineProto =
+        PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
+    LOG.info("Dataflow v1 pipeline proto:\n{}", TextFormat.printToString(dataflowV1PipelineProto));

Review comment:
       It is part of faee65e220dcca094a3c7ef8e398430545374470 "DO NOT SUBMIT: Logging" which I will drop

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
+    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
+    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();
 
-    List<DataflowPackage> packages = stageArtifacts(pipelineProto);
+    DataflowPackage stagedPipeline =
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());

Review comment:
       I could do that if it adds something. Eventually the goal is to eliminate all conditionals of "useUnifiedWorker" so that a valid v1beta3 and a valid UW submission are created no matter what.

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -931,39 +953,71 @@ public DataflowPipelineJob run(Pipeline pipeline) {
       if (!experiments.contains("beam_fn_api")) {
         experiments.add("beam_fn_api");
       }
-      options.setExperiments(experiments);
+      if (!experiments.contains("use_portable_job_submission")) {
+        experiments.add("use_portable_job_submission");
+      }
+      options.setExperiments(ImmutableList.copyOf(experiments));
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     if (containsUnboundedPCollection(pipeline)) {
       options.setStreaming(true);
     }
-    replaceTransforms(pipeline);
 
     LOG.info(
         "Executing pipeline on the Dataflow Service, which will have billing implications "
             + "related to Google Compute Engine usage and other Google Cloud Services.");
 
-    // Capture the sdkComponents for look up during step translations
-    SdkComponents sdkComponents = SdkComponents.create();
-
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
+
+    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
+    // with the SDK harness image (which implements Fn API).
+    //
+    // The same Environment is used in different and contradictory ways, depending on whether
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    sdkComponents.registerEnvironment(
+    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
+    // need the default environment.
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
         defaultEnvironmentForDataflow
             .toBuilder()
             .addAllDependencies(getDefaultArtifacts())
             .addAllCapabilities(Environments.getJavaCapabilities())
             .build());
 
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
-
-    LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto));
+    if (useUnifiedWorker(options)) {
+      pipeline.replaceAll(getPortableOverrides());
+    }
+    RunnerApi.Pipeline portablePipelineProto =
+        PipelineTranslation.toProto(pipeline, portableComponents, false);
+    LOG.info("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
+    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
+    // the options on the new job
+    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
+    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
+    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();
 
-    List<DataflowPackage> packages = stageArtifacts(pipelineProto);
+    DataflowPackage stagedPipeline =
+        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
+    // Now rewrite things to be as needed for v1 (mutates the pipeline)
+    replaceTransforms(pipeline);
+    // Capture the SdkComponents for look up during step translations
+    SdkComponents dataflowV1Components = SdkComponents.create();
+    dataflowV1Components.registerEnvironment(
+        defaultEnvironmentForDataflow
+            .toBuilder()
+            .addAllDependencies(getDefaultArtifacts())
+            .addAllCapabilities(Environments.getJavaCapabilities())
+            .build());
+    RunnerApi.Pipeline dataflowV1PipelineProto =
+        PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
+    LOG.info("Dataflow v1 pipeline proto:\n{}", TextFormat.printToString(dataflowV1PipelineProto));
+    List<DataflowPackage> packages = stageArtifacts(dataflowV1PipelineProto);

Review comment:
       This is why the environment was missing in the failing jobs earlier IIRC. I've now forgotten. I will diff against the failing PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org