You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2021/08/04 18:32:51 UTC

[beam] branch master updated: [BEAM-12671] Mark known composite transforms native (#15236)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 64188aa  [BEAM-12671] Mark known composite transforms native (#15236)
64188aa is described below

commit 64188aa8188de5a9e970fe456b9cf5187790613e
Author: Ke Wu <kw...@linkedin.com>
AuthorDate: Wed Aug 4 11:32:03 2021 -0700

    [BEAM-12671] Mark known composite transforms native (#15236)
---
 runners/samza/job-server/build.gradle              |  4 +++-
 .../beam/runners/samza/SamzaPipelineRunner.java    | 17 ++++++++++++--
 .../SamzaPortablePipelineTranslator.java           | 27 +++++++++++++++++++++-
 .../SamzaPortableTranslatorRegistrar.java          | 25 ++++++++++++++++++++
 .../runners/portability/samza_runner_test.py       |  6 +++++
 5 files changed, 75 insertions(+), 4 deletions(-)

diff --git a/runners/samza/job-server/build.gradle b/runners/samza/job-server/build.gradle
index bef6535..a8086c6 100644
--- a/runners/samza/job-server/build.gradle
+++ b/runners/samza/job-server/build.gradle
@@ -76,7 +76,7 @@ createPortableValidatesRunnerTask(
             excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
             // TODO: BEAM-12350
             excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
-
+            // TODO: BEAM-12681
             excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
             // Larger keys are possible, but they require more memory.
             excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above10MB'
@@ -101,6 +101,8 @@ createPortableValidatesRunnerTask(
             excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer'
         },
         testFilter: {
+            // TODO(BEAM-12677)
+            excludeTestsMatching "org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2"
             excludeTestsMatching "org.apache.beam.sdk.transforms.FlattenTest.testEmptyFlattenAsSideInput"
             excludeTestsMatching "org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmptyThenParDo"
             excludeTestsMatching "org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmpty"
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
index ef58588..bbdd1f5 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
@@ -20,13 +20,16 @@ package org.apache.beam.runners.samza;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
 import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
 import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
+import org.apache.beam.runners.core.construction.graph.TrivialNativeTransformExpander;
 import org.apache.beam.runners.core.construction.renderer.PipelineDotRenderer;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
 import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
+import org.apache.beam.runners.samza.translation.SamzaPortablePipelineTranslator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,9 +49,19 @@ public class SamzaPipelineRunner implements PortablePipelineRunner {
             pipeline,
             SplittableParDoExpander.createSizedReplacement());
 
+    // Don't let the fuser fuse any subcomponents of native transforms.
+    Pipeline trimmedPipeline =
+        TrivialNativeTransformExpander.forKnownUrns(
+            pipelineWithSdfExpanded, SamzaPortablePipelineTranslator.knownUrns());
+
     // Fused pipeline proto.
-    final RunnerApi.Pipeline fusedPipeline =
-        GreedyPipelineFuser.fuse(pipelineWithSdfExpanded).toPipeline();
+    // TODO: Consider supporting partially-fused graphs.
+    RunnerApi.Pipeline fusedPipeline =
+        trimmedPipeline.getComponents().getTransformsMap().values().stream()
+                .anyMatch(proto -> ExecutableStage.URN.equals(proto.getSpec().getUrn()))
+            ? trimmedPipeline
+            : GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline();
+
     LOG.info("Portable pipeline to run:");
     LOG.info(PipelineDotRenderer.toDotString(fusedPipeline));
     // the pipeline option coming from sdk will set the sdk specific runner which will break
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
index fcca6bb..407cc5d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
@@ -17,10 +17,14 @@
  */
 package org.apache.beam.runners.samza.translation;
 
+import com.google.auto.service.AutoService;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.ServiceLoader;
+import java.util.Set;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
@@ -33,6 +37,7 @@ import org.slf4j.LoggerFactory;
  * pipeline
  */
 @SuppressWarnings({
+  "keyfor",
   "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
@@ -43,7 +48,8 @@ public class SamzaPortablePipelineTranslator {
 
   private static Map<String, TransformTranslator<?>> loadTranslators() {
     Map<String, TransformTranslator<?>> translators = new HashMap<>();
-    for (SamzaTranslatorRegistrar registrar : ServiceLoader.load(SamzaTranslatorRegistrar.class)) {
+    for (SamzaPortableTranslatorRegistrar registrar :
+        ServiceLoader.load(SamzaPortableTranslatorRegistrar.class)) {
       translators.putAll(registrar.getTransformTranslators());
     }
     LOG.info("{} translators loaded.", translators.size());
@@ -85,4 +91,23 @@ public class SamzaPortablePipelineTranslator {
       }
     }
   }
+
+  public static Set<String> knownUrns() {
+    return TRANSLATORS.keySet();
+  }
+
+  /** Registers Samza translators. */
+  @AutoService(SamzaPortableTranslatorRegistrar.class)
+  public static class SamzaTranslators implements SamzaPortableTranslatorRegistrar {
+
+    @Override
+    public Map<String, TransformTranslator<?>> getTransformTranslators() {
+      return ImmutableMap.<String, TransformTranslator<?>>builder()
+          .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslator<>())
+          .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionsTranslator<>())
+          .put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new ImpulseTranslator())
+          .put(ExecutableStage.URN, new ParDoBoundMultiTranslator<>())
+          .build();
+    }
+  }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortableTranslatorRegistrar.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortableTranslatorRegistrar.java
new file mode 100644
index 0000000..5eede8f
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortableTranslatorRegistrar.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import java.util.Map;
+
+/** A registrar of TransformTranslator in portable pipeline. */
+public interface SamzaPortableTranslatorRegistrar {
+  Map<String, TransformTranslator<?>> getTransformTranslators();
+}
diff --git a/sdks/python/apache_beam/runners/portability/samza_runner_test.py b/sdks/python/apache_beam/runners/portability/samza_runner_test.py
index bcb44e5..93a0418 100644
--- a/sdks/python/apache_beam/runners/portability/samza_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/samza_runner_test.py
@@ -137,6 +137,12 @@ class SamzaRunnerTest(portable_runner_test.PortableRunnerTest):
     # Skip until Samza portable runner supports distribution metrics.
     raise unittest.SkipTest("BEAM-12614")
 
+  def test_flattened_side_input(self):
+    # Blocked on support for transcoding
+    # https://issues.apache.org/jira/browse/BEAM-12681
+    super(SamzaRunnerTest,
+          self).test_flattened_side_input(with_transcoding=False)
+
   def test_pack_combiners(self):
     # Stages produced by translations.pack_combiners are fused
     # by translations.greedily_fuse, which prevent the stages