You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/23 07:47:19 UTC

[GitHub] [beam] ihji opened a new pull request #13605: [BEAM-11508] Set sdkHarnessContainerImages in workerpool configuratio…

ihji opened a new pull request #13605:
URL: https://github.com/apache/beam/pull/13605


   …n for Dataflow runner v2
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji commented on a change in pull request #13605: [BEAM-11508] Set sdkHarnessContainerImages in workerpool configuratio…

Posted by GitBox <gi...@apache.org>.
ihji commented on a change in pull request #13605:
URL: https://github.com/apache/beam/pull/13605#discussion_r552295607



##########
File path: runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
##########
@@ -1342,6 +1347,57 @@ public void testTransformTranslator() throws IOException {
     assertTrue(transform.translated);
   }
 
+  @Test
+  public void testSdkHarnessConfiguration() throws IOException {

Review comment:
       The change applies to all existing ITs with use_runner_v2 flag (for example, runner v2 validate runner tests should fail if something goes wrong with this commit). Do you think we need a separate test only for this change?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji merged pull request #13605: [BEAM-11508] Set sdkHarnessContainerImages in workerpool configuratio…

Posted by GitBox <gi...@apache.org>.
ihji merged pull request #13605:
URL: https://github.com/apache/beam/pull/13605


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #13605: [BEAM-11508] Set sdkHarnessContainerImages in workerpool configuratio…

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #13605:
URL: https://github.com/apache/beam/pull/13605#discussion_r550936568



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1186,6 +1190,45 @@ public DataflowPipelineJob run(Pipeline pipeline) {
     return dataflowPipelineJob;
   }
 
+  static void configureSdkHarnessContainerImages(
+      DataflowPipelineOptions options,
+      RunnerApi.Pipeline pipelineProto,
+      Job newJob,
+      String workerHarnessContainerImage) {
+    if (hasExperiment(options, "use_runner_v2") || hasExperiment(options, "use_unified_worker")) {
+      ImmutableSet.Builder<String> sdkContainerUrlSetBuilder = ImmutableSet.builder();
+      sdkContainerUrlSetBuilder.add(workerHarnessContainerImage);
+      for (Map.Entry<String, RunnerApi.Environment> entry :
+          pipelineProto.getComponents().getEnvironmentsMap().entrySet()) {
+        if (!BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)
+            .equals(entry.getValue().getUrn())) {
+          throw new RuntimeException(
+              "Dataflow can only execute pipeline steps in Docker environments: "
+                  + entry.getValue().getUrn());
+        }
+        RunnerApi.DockerPayload dockerPayload;
+        try {
+          dockerPayload = RunnerApi.DockerPayload.parseFrom(entry.getValue().getPayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing docker payload.", e);
+        }
+        sdkContainerUrlSetBuilder.add(dockerPayload.getContainerImage());

Review comment:
       It might make sense to de-dupe here in-case same Docker URL gets added twice (we can generalize this when Dataflow supports multiple environments with the same Docker URL hence please add a TODO).

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1186,6 +1190,45 @@ public DataflowPipelineJob run(Pipeline pipeline) {
     return dataflowPipelineJob;
   }
 
+  static void configureSdkHarnessContainerImages(
+      DataflowPipelineOptions options,
+      RunnerApi.Pipeline pipelineProto,
+      Job newJob,
+      String workerHarnessContainerImage) {
+    if (hasExperiment(options, "use_runner_v2") || hasExperiment(options, "use_unified_worker")) {
+      ImmutableSet.Builder<String> sdkContainerUrlSetBuilder = ImmutableSet.builder();
+      sdkContainerUrlSetBuilder.add(workerHarnessContainerImage);
+      for (Map.Entry<String, RunnerApi.Environment> entry :
+          pipelineProto.getComponents().getEnvironmentsMap().entrySet()) {
+        if (!BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)
+            .equals(entry.getValue().getUrn())) {
+          throw new RuntimeException(
+              "Dataflow can only execute pipeline steps in Docker environments: "
+                  + entry.getValue().getUrn());
+        }
+        RunnerApi.DockerPayload dockerPayload;
+        try {
+          dockerPayload = RunnerApi.DockerPayload.parseFrom(entry.getValue().getPayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing docker payload.", e);
+        }
+        sdkContainerUrlSetBuilder.add(dockerPayload.getContainerImage());
+      }
+      List<SdkHarnessContainerImage> sdkContainerList =

Review comment:
       Do we need any special handling for pipeline SDK similar to following for Python ?
   
   https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L285

##########
File path: runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
##########
@@ -1342,6 +1347,57 @@ public void testTransformTranslator() throws IOException {
     assertTrue(transform.translated);
   }
 
+  @Test
+  public void testSdkHarnessConfiguration() throws IOException {

Review comment:
       Please also try out a Dataflow GCE job (or existing ITs).

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1186,6 +1190,45 @@ public DataflowPipelineJob run(Pipeline pipeline) {
     return dataflowPipelineJob;
   }
 
+  static void configureSdkHarnessContainerImages(
+      DataflowPipelineOptions options,
+      RunnerApi.Pipeline pipelineProto,
+      Job newJob,
+      String workerHarnessContainerImage) {
+    if (hasExperiment(options, "use_runner_v2") || hasExperiment(options, "use_unified_worker")) {
+      ImmutableSet.Builder<String> sdkContainerUrlSetBuilder = ImmutableSet.builder();
+      sdkContainerUrlSetBuilder.add(workerHarnessContainerImage);
+      for (Map.Entry<String, RunnerApi.Environment> entry :
+          pipelineProto.getComponents().getEnvironmentsMap().entrySet()) {
+        if (!BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)
+            .equals(entry.getValue().getUrn())) {
+          throw new RuntimeException(
+              "Dataflow can only execute pipeline steps in Docker environments: "
+                  + entry.getValue().getUrn());
+        }
+        RunnerApi.DockerPayload dockerPayload;
+        try {
+          dockerPayload = RunnerApi.DockerPayload.parseFrom(entry.getValue().getPayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing docker payload.", e);
+        }
+        sdkContainerUrlSetBuilder.add(dockerPayload.getContainerImage());
+      }
+      List<SdkHarnessContainerImage> sdkContainerList =
+          sdkContainerUrlSetBuilder.build().stream()
+              .map(
+                  (String url) -> {
+                    SdkHarnessContainerImage image = new SdkHarnessContainerImage();
+                    image.setContainerImage(url);
+                    return image;
+                  })
+              .collect(Collectors.toList());
+      for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
+        workerPool.setSdkHarnessContainerImages(sdkContainerList);

Review comment:
       For any Python environments, we should set 'useSingleCorePerContainer' to true for efficiency.

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1186,6 +1190,45 @@ public DataflowPipelineJob run(Pipeline pipeline) {
     return dataflowPipelineJob;
   }
 
+  static void configureSdkHarnessContainerImages(
+      DataflowPipelineOptions options,
+      RunnerApi.Pipeline pipelineProto,
+      Job newJob,
+      String workerHarnessContainerImage) {
+    if (hasExperiment(options, "use_runner_v2") || hasExperiment(options, "use_unified_worker")) {

Review comment:
       Is there a util function that we can depend on (similar to [1] for Python) introduce of checking these properties separately for runner v2 which could lead to inconsistencies ?
   
   [1] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L1039




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji commented on a change in pull request #13605: [BEAM-11508] Set sdkHarnessContainerImages in workerpool configuratio…

Posted by GitBox <gi...@apache.org>.
ihji commented on a change in pull request #13605:
URL: https://github.com/apache/beam/pull/13605#discussion_r552292154



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1186,6 +1190,45 @@ public DataflowPipelineJob run(Pipeline pipeline) {
     return dataflowPipelineJob;
   }
 
+  static void configureSdkHarnessContainerImages(
+      DataflowPipelineOptions options,
+      RunnerApi.Pipeline pipelineProto,
+      Job newJob,
+      String workerHarnessContainerImage) {
+    if (hasExperiment(options, "use_runner_v2") || hasExperiment(options, "use_unified_worker")) {

Review comment:
       add utility function to check runner v2 flags




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji merged pull request #13605: [BEAM-11508] Set sdkHarnessContainerImages in workerpool configuratio…

Posted by GitBox <gi...@apache.org>.
ihji merged pull request #13605:
URL: https://github.com/apache/beam/pull/13605


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji commented on a change in pull request #13605: [BEAM-11508] Set sdkHarnessContainerImages in workerpool configuratio…

Posted by GitBox <gi...@apache.org>.
ihji commented on a change in pull request #13605:
URL: https://github.com/apache/beam/pull/13605#discussion_r552292639



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1186,6 +1190,45 @@ public DataflowPipelineJob run(Pipeline pipeline) {
     return dataflowPipelineJob;
   }
 
+  static void configureSdkHarnessContainerImages(
+      DataflowPipelineOptions options,
+      RunnerApi.Pipeline pipelineProto,
+      Job newJob,
+      String workerHarnessContainerImage) {
+    if (hasExperiment(options, "use_runner_v2") || hasExperiment(options, "use_unified_worker")) {
+      ImmutableSet.Builder<String> sdkContainerUrlSetBuilder = ImmutableSet.builder();
+      sdkContainerUrlSetBuilder.add(workerHarnessContainerImage);
+      for (Map.Entry<String, RunnerApi.Environment> entry :
+          pipelineProto.getComponents().getEnvironmentsMap().entrySet()) {
+        if (!BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)
+            .equals(entry.getValue().getUrn())) {
+          throw new RuntimeException(
+              "Dataflow can only execute pipeline steps in Docker environments: "
+                  + entry.getValue().getUrn());
+        }
+        RunnerApi.DockerPayload dockerPayload;
+        try {
+          dockerPayload = RunnerApi.DockerPayload.parseFrom(entry.getValue().getPayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing docker payload.", e);
+        }
+        sdkContainerUrlSetBuilder.add(dockerPayload.getContainerImage());
+      }
+      List<SdkHarnessContainerImage> sdkContainerList =

Review comment:
       Pipeline SDK container is passed as a parameter and to be the first entry added to the container url set (as same as the python code).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji commented on a change in pull request #13605: [BEAM-11508] Set sdkHarnessContainerImages in workerpool configuratio…

Posted by GitBox <gi...@apache.org>.
ihji commented on a change in pull request #13605:
URL: https://github.com/apache/beam/pull/13605#discussion_r552292914



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1186,6 +1190,45 @@ public DataflowPipelineJob run(Pipeline pipeline) {
     return dataflowPipelineJob;
   }
 
+  static void configureSdkHarnessContainerImages(
+      DataflowPipelineOptions options,
+      RunnerApi.Pipeline pipelineProto,
+      Job newJob,
+      String workerHarnessContainerImage) {
+    if (hasExperiment(options, "use_runner_v2") || hasExperiment(options, "use_unified_worker")) {
+      ImmutableSet.Builder<String> sdkContainerUrlSetBuilder = ImmutableSet.builder();
+      sdkContainerUrlSetBuilder.add(workerHarnessContainerImage);
+      for (Map.Entry<String, RunnerApi.Environment> entry :
+          pipelineProto.getComponents().getEnvironmentsMap().entrySet()) {
+        if (!BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)
+            .equals(entry.getValue().getUrn())) {
+          throw new RuntimeException(
+              "Dataflow can only execute pipeline steps in Docker environments: "
+                  + entry.getValue().getUrn());
+        }
+        RunnerApi.DockerPayload dockerPayload;
+        try {
+          dockerPayload = RunnerApi.DockerPayload.parseFrom(entry.getValue().getPayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing docker payload.", e);
+        }
+        sdkContainerUrlSetBuilder.add(dockerPayload.getContainerImage());

Review comment:
       We are collecting the urls in an immutable set so there should be only one entry with the same container url.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji commented on pull request #13605: [BEAM-11508] Set sdkHarnessContainerImages in workerpool configuratio…

Posted by GitBox <gi...@apache.org>.
ihji commented on pull request #13605:
URL: https://github.com/apache/beam/pull/13605#issuecomment-750438388


   R: @chamikaramj


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji commented on a change in pull request #13605: [BEAM-11508] Set sdkHarnessContainerImages in workerpool configuratio…

Posted by GitBox <gi...@apache.org>.
ihji commented on a change in pull request #13605:
URL: https://github.com/apache/beam/pull/13605#discussion_r552293219



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1186,6 +1190,45 @@ public DataflowPipelineJob run(Pipeline pipeline) {
     return dataflowPipelineJob;
   }
 
+  static void configureSdkHarnessContainerImages(
+      DataflowPipelineOptions options,
+      RunnerApi.Pipeline pipelineProto,
+      Job newJob,
+      String workerHarnessContainerImage) {
+    if (hasExperiment(options, "use_runner_v2") || hasExperiment(options, "use_unified_worker")) {
+      ImmutableSet.Builder<String> sdkContainerUrlSetBuilder = ImmutableSet.builder();
+      sdkContainerUrlSetBuilder.add(workerHarnessContainerImage);
+      for (Map.Entry<String, RunnerApi.Environment> entry :
+          pipelineProto.getComponents().getEnvironmentsMap().entrySet()) {
+        if (!BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)
+            .equals(entry.getValue().getUrn())) {
+          throw new RuntimeException(
+              "Dataflow can only execute pipeline steps in Docker environments: "
+                  + entry.getValue().getUrn());
+        }
+        RunnerApi.DockerPayload dockerPayload;
+        try {
+          dockerPayload = RunnerApi.DockerPayload.parseFrom(entry.getValue().getPayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing docker payload.", e);
+        }
+        sdkContainerUrlSetBuilder.add(dockerPayload.getContainerImage());
+      }
+      List<SdkHarnessContainerImage> sdkContainerList =
+          sdkContainerUrlSetBuilder.build().stream()
+              .map(
+                  (String url) -> {
+                    SdkHarnessContainerImage image = new SdkHarnessContainerImage();
+                    image.setContainerImage(url);
+                    return image;
+                  })
+              .collect(Collectors.toList());
+      for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
+        workerPool.setSdkHarnessContainerImages(sdkContainerList);

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #13605: [BEAM-11508] Set sdkHarnessContainerImages in workerpool configuratio…

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #13605:
URL: https://github.com/apache/beam/pull/13605#discussion_r553719025



##########
File path: runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
##########
@@ -1342,6 +1347,57 @@ public void testTransformTranslator() throws IOException {
     assertTrue(transform.translated);
   }
 
+  @Test
+  public void testSdkHarnessConfiguration() throws IOException {

Review comment:
       No existing Runner v2 ITs should be adequate. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org