You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/01/03 00:23:00 UTC

[jira] [Work logged] (BEAM-11508) Set sdkHarnessContainerImages in workerpool configuration for Dataflow runner v2

     [ https://issues.apache.org/jira/browse/BEAM-11508?focusedWorklogId=530317&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-530317 ]

ASF GitHub Bot logged work on BEAM-11508:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Jan/21 00:22
            Start Date: 03/Jan/21 00:22
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on a change in pull request #13605:
URL: https://github.com/apache/beam/pull/13605#discussion_r550936568



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1186,6 +1190,45 @@ public DataflowPipelineJob run(Pipeline pipeline) {
     return dataflowPipelineJob;
   }
 
+  static void configureSdkHarnessContainerImages(
+      DataflowPipelineOptions options,
+      RunnerApi.Pipeline pipelineProto,
+      Job newJob,
+      String workerHarnessContainerImage) {
+    if (hasExperiment(options, "use_runner_v2") || hasExperiment(options, "use_unified_worker")) {
+      ImmutableSet.Builder<String> sdkContainerUrlSetBuilder = ImmutableSet.builder();
+      sdkContainerUrlSetBuilder.add(workerHarnessContainerImage);
+      for (Map.Entry<String, RunnerApi.Environment> entry :
+          pipelineProto.getComponents().getEnvironmentsMap().entrySet()) {
+        if (!BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)
+            .equals(entry.getValue().getUrn())) {
+          throw new RuntimeException(
+              "Dataflow can only execute pipeline steps in Docker environments: "
+                  + entry.getValue().getUrn());
+        }
+        RunnerApi.DockerPayload dockerPayload;
+        try {
+          dockerPayload = RunnerApi.DockerPayload.parseFrom(entry.getValue().getPayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing docker payload.", e);
+        }
+        sdkContainerUrlSetBuilder.add(dockerPayload.getContainerImage());

Review comment:
       It might make sense to de-dupe here in-case same Docker URL gets added twice (we can generalize this when Dataflow supports multiple environments with the same Docker URL hence please add a TODO).

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1186,6 +1190,45 @@ public DataflowPipelineJob run(Pipeline pipeline) {
     return dataflowPipelineJob;
   }
 
+  static void configureSdkHarnessContainerImages(
+      DataflowPipelineOptions options,
+      RunnerApi.Pipeline pipelineProto,
+      Job newJob,
+      String workerHarnessContainerImage) {
+    if (hasExperiment(options, "use_runner_v2") || hasExperiment(options, "use_unified_worker")) {
+      ImmutableSet.Builder<String> sdkContainerUrlSetBuilder = ImmutableSet.builder();
+      sdkContainerUrlSetBuilder.add(workerHarnessContainerImage);
+      for (Map.Entry<String, RunnerApi.Environment> entry :
+          pipelineProto.getComponents().getEnvironmentsMap().entrySet()) {
+        if (!BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)
+            .equals(entry.getValue().getUrn())) {
+          throw new RuntimeException(
+              "Dataflow can only execute pipeline steps in Docker environments: "
+                  + entry.getValue().getUrn());
+        }
+        RunnerApi.DockerPayload dockerPayload;
+        try {
+          dockerPayload = RunnerApi.DockerPayload.parseFrom(entry.getValue().getPayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing docker payload.", e);
+        }
+        sdkContainerUrlSetBuilder.add(dockerPayload.getContainerImage());
+      }
+      List<SdkHarnessContainerImage> sdkContainerList =

Review comment:
       Do we need any special handling for pipeline SDK similar to following for Python ?
   
   https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L285

##########
File path: runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
##########
@@ -1342,6 +1347,57 @@ public void testTransformTranslator() throws IOException {
     assertTrue(transform.translated);
   }
 
+  @Test
+  public void testSdkHarnessConfiguration() throws IOException {

Review comment:
       Please also try out a Dataflow GCE job (or existing ITs).

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1186,6 +1190,45 @@ public DataflowPipelineJob run(Pipeline pipeline) {
     return dataflowPipelineJob;
   }
 
+  static void configureSdkHarnessContainerImages(
+      DataflowPipelineOptions options,
+      RunnerApi.Pipeline pipelineProto,
+      Job newJob,
+      String workerHarnessContainerImage) {
+    if (hasExperiment(options, "use_runner_v2") || hasExperiment(options, "use_unified_worker")) {
+      ImmutableSet.Builder<String> sdkContainerUrlSetBuilder = ImmutableSet.builder();
+      sdkContainerUrlSetBuilder.add(workerHarnessContainerImage);
+      for (Map.Entry<String, RunnerApi.Environment> entry :
+          pipelineProto.getComponents().getEnvironmentsMap().entrySet()) {
+        if (!BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)
+            .equals(entry.getValue().getUrn())) {
+          throw new RuntimeException(
+              "Dataflow can only execute pipeline steps in Docker environments: "
+                  + entry.getValue().getUrn());
+        }
+        RunnerApi.DockerPayload dockerPayload;
+        try {
+          dockerPayload = RunnerApi.DockerPayload.parseFrom(entry.getValue().getPayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing docker payload.", e);
+        }
+        sdkContainerUrlSetBuilder.add(dockerPayload.getContainerImage());
+      }
+      List<SdkHarnessContainerImage> sdkContainerList =
+          sdkContainerUrlSetBuilder.build().stream()
+              .map(
+                  (String url) -> {
+                    SdkHarnessContainerImage image = new SdkHarnessContainerImage();
+                    image.setContainerImage(url);
+                    return image;
+                  })
+              .collect(Collectors.toList());
+      for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
+        workerPool.setSdkHarnessContainerImages(sdkContainerList);

Review comment:
       For any Python environments, we should set 'useSingleCorePerContainer' to true for efficiency.

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1186,6 +1190,45 @@ public DataflowPipelineJob run(Pipeline pipeline) {
     return dataflowPipelineJob;
   }
 
+  static void configureSdkHarnessContainerImages(
+      DataflowPipelineOptions options,
+      RunnerApi.Pipeline pipelineProto,
+      Job newJob,
+      String workerHarnessContainerImage) {
+    if (hasExperiment(options, "use_runner_v2") || hasExperiment(options, "use_unified_worker")) {

Review comment:
       Is there a util function that we can depend on (similar to [1] for Python) introduce of checking these properties separately for runner v2 which could lead to inconsistencies ?
   
   [1] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L1039




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 530317)
    Time Spent: 0.5h  (was: 20m)

> Set sdkHarnessContainerImages in workerpool configuration for Dataflow runner v2
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-11508
>                 URL: https://issues.apache.org/jira/browse/BEAM-11508
>             Project: Beam
>          Issue Type: Improvement
>          Components: cross-language, runner-dataflow
>            Reporter: Heejong Lee
>            Assignee: Heejong Lee
>            Priority: P2
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Set sdkHarnessContainerImages in workerpool configuration for Dataflow runner v2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)