You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/10 21:35:35 UTC

[GitHub] [beam] robertwb commented on a diff in pull request #21788: Automatically enable Runner v2 for pipelines that use cross-language transforms.

robertwb commented on code in PR #21788:
URL: https://github.com/apache/beam/pull/21788#discussion_r894903915


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java:
##########
@@ -1038,8 +1038,47 @@ private List<RunnerApi.ArtifactInformation> getDefaultArtifacts() {
     return Environments.getArtifacts(pathsToStageBuilder.build());
   }
 
+  @VisibleForTesting
+  static boolean isMultiLanguagePipeline(Pipeline pipeline) {
+    class IsMultiLanguageVisitor extends PipelineVisitor.Defaults {
+      private boolean isMultiLanguage = false;
+
+      private void performMultiLanguageTest(Node node) {
+        if (node.getTransform() instanceof External.ExpandableTransform) {
+          isMultiLanguage = true;
+        }
+      }
+
+      @Override
+      public CompositeBehavior enterCompositeTransform(Node node) {
+        performMultiLanguageTest(node);
+        return super.enterCompositeTransform(node);
+      }
+
+      @Override
+      public void visitPrimitiveTransform(Node node) {
+        performMultiLanguageTest(node);
+        super.visitPrimitiveTransform(node);
+      }
+    }
+
+    IsMultiLanguageVisitor visitor = new IsMultiLanguageVisitor();
+    pipeline.traverseTopologically(visitor);
+
+    return visitor.isMultiLanguage;
+  }
+
   @Override
   public DataflowPipelineJob run(Pipeline pipeline) {
+    if (DataflowRunner.isMultiLanguagePipeline(pipeline)) {
+      List<String> experiments = options.getExperiments();
+      if (!experiments.contains("use_runner_v2")) {
+        LOG.info(
+            "Automatically enabling Dataflow Runner v2 since the pipeline used cross-language"
+                + "transforms");
+        experiments.add("use_runner_v2");

Review Comment:
   Do we have to add portable_job_submission as well, or is that not automatic for all Java pipelines? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org