You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2021/08/04 18:32:51 UTC
[beam] branch master updated: [BEAM-12671] Mark known composite
transforms native (#15236)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 64188aa [BEAM-12671] Mark known composite transforms native (#15236)
64188aa is described below
commit 64188aa8188de5a9e970fe456b9cf5187790613e
Author: Ke Wu <kw...@linkedin.com>
AuthorDate: Wed Aug 4 11:32:03 2021 -0700
[BEAM-12671] Mark known composite transforms native (#15236)
---
runners/samza/job-server/build.gradle | 4 +++-
.../beam/runners/samza/SamzaPipelineRunner.java | 17 ++++++++++++--
.../SamzaPortablePipelineTranslator.java | 27 +++++++++++++++++++++-
.../SamzaPortableTranslatorRegistrar.java | 25 ++++++++++++++++++++
.../runners/portability/samza_runner_test.py | 6 +++++
5 files changed, 75 insertions(+), 4 deletions(-)
diff --git a/runners/samza/job-server/build.gradle b/runners/samza/job-server/build.gradle
index bef6535..a8086c6 100644
--- a/runners/samza/job-server/build.gradle
+++ b/runners/samza/job-server/build.gradle
@@ -76,7 +76,7 @@ createPortableValidatesRunnerTask(
excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
// TODO: BEAM-12350
excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
-
+ // TODO: BEAM-12681
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
// Larger keys are possible, but they require more memory.
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above10MB'
@@ -101,6 +101,8 @@ createPortableValidatesRunnerTask(
excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer'
},
testFilter: {
+ // TODO(BEAM-12677)
+ excludeTestsMatching "org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2"
excludeTestsMatching "org.apache.beam.sdk.transforms.FlattenTest.testEmptyFlattenAsSideInput"
excludeTestsMatching "org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmptyThenParDo"
excludeTestsMatching "org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmpty"
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
index ef58588..bbdd1f5 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
@@ -20,13 +20,16 @@ package org.apache.beam.runners.samza;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
+import org.apache.beam.runners.core.construction.graph.TrivialNativeTransformExpander;
import org.apache.beam.runners.core.construction.renderer.PipelineDotRenderer;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
+import org.apache.beam.runners.samza.translation.SamzaPortablePipelineTranslator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,9 +49,19 @@ public class SamzaPipelineRunner implements PortablePipelineRunner {
pipeline,
SplittableParDoExpander.createSizedReplacement());
+ // Don't let the fuser fuse any subcomponents of native transforms.
+ Pipeline trimmedPipeline =
+ TrivialNativeTransformExpander.forKnownUrns(
+ pipelineWithSdfExpanded, SamzaPortablePipelineTranslator.knownUrns());
+
// Fused pipeline proto.
- final RunnerApi.Pipeline fusedPipeline =
- GreedyPipelineFuser.fuse(pipelineWithSdfExpanded).toPipeline();
+ // TODO: Consider supporting partially-fused graphs.
+ RunnerApi.Pipeline fusedPipeline =
+ trimmedPipeline.getComponents().getTransformsMap().values().stream()
+ .anyMatch(proto -> ExecutableStage.URN.equals(proto.getSpec().getUrn()))
+ ? trimmedPipeline
+ : GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline();
+
LOG.info("Portable pipeline to run:");
LOG.info(PipelineDotRenderer.toDotString(fusedPipeline));
// the pipeline option coming from sdk will set the sdk specific runner which will break
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
index fcca6bb..407cc5d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
@@ -17,10 +17,14 @@
*/
package org.apache.beam.runners.samza.translation;
+import com.google.auto.service.AutoService;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
+import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
@@ -33,6 +37,7 @@ import org.slf4j.LoggerFactory;
* pipeline
*/
@SuppressWarnings({
+ "keyfor",
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
@@ -43,7 +48,8 @@ public class SamzaPortablePipelineTranslator {
private static Map<String, TransformTranslator<?>> loadTranslators() {
Map<String, TransformTranslator<?>> translators = new HashMap<>();
- for (SamzaTranslatorRegistrar registrar : ServiceLoader.load(SamzaTranslatorRegistrar.class)) {
+ for (SamzaPortableTranslatorRegistrar registrar :
+ ServiceLoader.load(SamzaPortableTranslatorRegistrar.class)) {
translators.putAll(registrar.getTransformTranslators());
}
LOG.info("{} translators loaded.", translators.size());
@@ -85,4 +91,23 @@ public class SamzaPortablePipelineTranslator {
}
}
}
+
+ public static Set<String> knownUrns() {
+ return TRANSLATORS.keySet();
+ }
+
+ /** Registers Samza translators. */
+ @AutoService(SamzaPortableTranslatorRegistrar.class)
+ public static class SamzaTranslators implements SamzaPortableTranslatorRegistrar {
+
+ @Override
+ public Map<String, TransformTranslator<?>> getTransformTranslators() {
+ return ImmutableMap.<String, TransformTranslator<?>>builder()
+ .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslator<>())
+ .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionsTranslator<>())
+ .put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new ImpulseTranslator())
+ .put(ExecutableStage.URN, new ParDoBoundMultiTranslator<>())
+ .build();
+ }
+ }
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortableTranslatorRegistrar.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortableTranslatorRegistrar.java
new file mode 100644
index 0000000..5eede8f
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortableTranslatorRegistrar.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import java.util.Map;
+
+/** A registrar of TransformTranslator in portable pipeline. */
+public interface SamzaPortableTranslatorRegistrar {
+ Map<String, TransformTranslator<?>> getTransformTranslators();
+}
diff --git a/sdks/python/apache_beam/runners/portability/samza_runner_test.py b/sdks/python/apache_beam/runners/portability/samza_runner_test.py
index bcb44e5..93a0418 100644
--- a/sdks/python/apache_beam/runners/portability/samza_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/samza_runner_test.py
@@ -137,6 +137,12 @@ class SamzaRunnerTest(portable_runner_test.PortableRunnerTest):
# Skip until Samza portable runner supports distribution metrics.
raise unittest.SkipTest("BEAM-12614")
+ def test_flattened_side_input(self):
+ # Blocked on support for transcoding
+ # https://issues.apache.org/jira/browse/BEAM-12681
+ super(SamzaRunnerTest,
+ self).test_flattened_side_input(with_transcoding=False)
+
def test_pack_combiners(self):
# Stages produced by translations.pack_combiners are fused
# by translations.greedily_fuse, which prevent the stages