You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/05/14 16:14:26 UTC
[beam] branch release-2.21.0 updated: [BEAM-9001,
BEAM-6327] Ensure that all transforms (except for required runner
implemented transforms) have an environment id. (#11670)
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.21.0 by this push:
new d87c0ab [BEAM-9001, BEAM-6327] Ensure that all transforms (except for required runner implemented transforms) have an environment id. (#11670)
new e859735 Merge pull request #11703 from ibzib/BEAM-9001
d87c0ab is described below
commit d87c0abc51815736c190f05530eca880e6b394ea
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Wed May 13 15:45:16 2020 -0700
[BEAM-9001, BEAM-6327] Ensure that all transforms (except for required runner implemented transforms) have an environment id. (#11670)
* [BEAM-9001, BEAM-6327] Ensure that all transforms (except for required runner implemented transforms) have an environment id.
* fixup! Fix native transform expander to not reinsert deleted transforms.
* fixup! Address chamikara's PR comments
---
.../pipeline/src/main/proto/beam_runner_api.proto | 18 ++++++---
.../core/construction/PTransformTranslation.java | 40 +++++++++++--------
.../core/construction/graph/PipelineValidator.java | 9 +++++
...er.java => TrivialNativeTransformExpander.java} | 45 ++++++++++++----------
.../construction/graph/QueryablePipelineTest.java | 6 ++-
.../beam/runners/flink/FlinkPipelineRunner.java | 5 ++-
.../FlinkStreamingPortablePipelineTranslator.java | 2 +-
.../beam/runners/spark/SparkPipelineRunner.java | 5 ++-
sdks/go/pkg/beam/core/runtime/graphx/translate.go | 22 ++++++-----
sdks/python/apache_beam/pipeline.py | 24 +++++-------
10 files changed, 106 insertions(+), 70 deletions(-)
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 0b8d651..2bef4ee 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -151,8 +151,9 @@ message PTransform {
// details.
FunctionSpec spec = 1;
- // (Optional) if this node is a composite, a list of the ids of
- // transforms that it contains.
+ // (Optional) A list of the ids of transforms that it contains.
+ //
+ // Primitive transforms are not allowed to specify this.
repeated string subtransforms = 2;
// (Required) A map from local names of inputs (unique only with this map, and
@@ -184,9 +185,10 @@ message PTransform {
// there is none, it may be omitted.
repeated DisplayData display_data = 6;
- // (Optional) Environment where the current PTransform should be executed in.
- // Runner that executes the pipeline may choose to override this if needed. If
- // not specified, environment will be decided by the runner.
+ // Environment where the current PTransform should be executed in.
+ //
+ // Transforms that are required to be implemented by a runner must omit this.
+ // All other transforms are required to specify this.
string environment_id = 7;
}
@@ -227,12 +229,18 @@ message StandardPTransforms {
// See https://beam.apache.org/documentation/programming-guide/#groupbykey
// for additional details.
//
+ // Never defines an environment as the runner is required to implement this
+ // transform.
+ //
// Payload: None
GROUP_BY_KEY = 2 [(beam_urn) = "beam:transform:group_by_key:v1"];
// A transform which produces a single empty byte array at the minimum
// timestamp in the GlobalWindow.
//
+ // Never defines an environment as the runner is required to implement this
+ // transform.
+ //
// Payload: None
IMPULSE = 3 [(beam_urn) = "beam:transform:impulse:v1"];
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 1ff1d69..cf3a7e5 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -50,8 +50,8 @@ import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSortedSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
@@ -74,6 +74,10 @@ public class PTransformTranslation {
public static final String MAP_WINDOWS_TRANSFORM_URN = "beam:transform:map_windows:v1";
public static final String MERGE_WINDOWS_TRANSFORM_URN = "beam:transform:merge_windows:v1";
+ // Required runner implemented transforms. These transforms should never specify an environment.
+ public static final ImmutableSet<String> RUNNER_IMPLEMENTED_TRANSFORMS =
+ ImmutableSet.of(GROUP_BY_KEY_TRANSFORM_URN, IMPULSE_TRANSFORM_URN);
+
// DeprecatedPrimitives
/**
* @deprecated SDKs should move away from creating `Read` transforms and migrate to using Impulse
@@ -350,10 +354,15 @@ public class PTransformTranslation {
// A composite transform is permitted to have a null spec. There are also some pseudo-
// primitives not yet supported by the portability framework that have null specs
+ String urn = "";
if (spec != null) {
+ urn = spec.getUrn();
transformBuilder.setSpec(spec);
}
- transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
+
+ if (!RUNNER_IMPLEMENTED_TRANSFORMS.contains(urn)) {
+ transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
+ }
return transformBuilder.build();
}
}
@@ -367,11 +376,6 @@ public class PTransformTranslation {
private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
- // TODO: BEAM-9001 - set environment ID in all transforms and allow runners to override.
- private static List<String> sdkTransformsWithEnvironment =
- ImmutableList.of(
- PAR_DO_TRANSFORM_URN, COMBINE_PER_KEY_TRANSFORM_URN, ASSIGN_WINDOWS_TRANSFORM_URN);
-
private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
loadTransformPayloadTranslators() {
HashMap<Class<? extends PTransform>, TransformPayloadTranslator> translators =
@@ -423,14 +427,20 @@ public class PTransformTranslation {
if (spec != null) {
transformBuilder.setSpec(spec);
- if (sdkTransformsWithEnvironment.contains(spec.getUrn())) {
- transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
- } else if (spec.getUrn().equals(READ_TRANSFORM_URN)
- && (appliedPTransform.getTransform().getClass() == Read.Bounded.class)) {
- // Only assigning environment to Bounded reads. Not assigning an environment to Unbounded
- // reads since they are a Runner translated transform, unless, in the future, we have an
- // adapter available for splittable DoFn.
- transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
+ // Required runner implemented transforms should not have an environment id.
+ if (!RUNNER_IMPLEMENTED_TRANSFORMS.contains(spec.getUrn())) {
+ // TODO(BEAM-9309): Remove existing hacks around deprecated READ transform.
+ if (spec.getUrn().equals(READ_TRANSFORM_URN)) {
+ // Only assigning environment to Bounded reads. Not assigning an environment to
+ // Unbounded
+ // reads since they are a Runner translated transform, unless, in the future, we have an
+ // adapter available for splittable DoFn.
+ if (appliedPTransform.getTransform().getClass() == Read.Bounded.class) {
+ transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
+ }
+ } else {
+ transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
+ }
}
}
return transformBuilder.build();
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
index 34e74a7..a27b220 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
@@ -208,6 +208,15 @@ public class PipelineValidator {
}
String urn = transform.getSpec().getUrn();
+ if (PTransformTranslation.RUNNER_IMPLEMENTED_TRANSFORMS.contains(urn)) {
+ checkArgument(
+ transform.getEnvironmentId().isEmpty(),
+ "Transform %s references environment %s when no environment should be specified since it is a required runner implemented transform %s.",
+ id,
+ transform.getEnvironmentId(),
+ urn);
+ }
+
if (VALIDATORS.containsKey(urn)) {
try {
VALIDATORS.get(urn).validate(id, transform, components, requirements);
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java
similarity index 54%
rename from runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
rename to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java
index 894a5d1..8ad95e8 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java
@@ -23,20 +23,22 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// TODO(BEAM-6327): Remove the need for this.
-
-/** PipelineTrimmer removes subcomponents of native transforms that shouldn't be fused. */
-public class PipelineTrimmer {
- private static final Logger LOG = LoggerFactory.getLogger(PipelineTrimmer.class);
+/**
+ * TrivialNativeTransformExpander is used to replace transforms with known URNs with their native
+ * equivalent.
+ */
+public class TrivialNativeTransformExpander {
+ private static final Logger LOG = LoggerFactory.getLogger(TrivialNativeTransformExpander.class);
/**
- * Remove subcomponents of native transforms that shouldn't be fused.
+ * Replaces transforms with the known URN with a native equivalent stripping the environment and
+ * removing any sub-transforms from the returned pipeline.
*
* @param pipeline the pipeline to be trimmed
* @param knownUrns set of URNs for the runner's native transforms
* @return the trimmed pipeline
*/
- public static Pipeline trim(Pipeline pipeline, Set<String> knownUrns) {
+ public static Pipeline forKnownUrns(Pipeline pipeline, Set<String> knownUrns) {
return makeKnownUrnsPrimitives(pipeline, knownUrns);
}
@@ -44,26 +46,29 @@ public class PipelineTrimmer {
RunnerApi.Pipeline pipeline, Set<String> knownUrns) {
RunnerApi.Pipeline.Builder trimmedPipeline = pipeline.toBuilder();
for (String ptransformId : pipeline.getComponents().getTransformsMap().keySet()) {
- if (knownUrns.contains(
- pipeline.getComponents().getTransformsOrThrow(ptransformId).getSpec().getUrn())) {
- LOG.debug("Removing descendants of known PTransform {}" + ptransformId);
+ // Skip over previously removed transforms from the original pipeline since we iterate
+ // over all transforms from the original pipeline and not the trimmed down version.
+ RunnerApi.PTransform currentTransform =
+ trimmedPipeline.getComponents().getTransformsOrDefault(ptransformId, null);
+ if (currentTransform != null && knownUrns.contains(currentTransform.getSpec().getUrn())) {
+ LOG.debug(
+ "Removing descendants and environment of known native PTransform {}" + ptransformId);
removeDescendants(trimmedPipeline, ptransformId);
+ trimmedPipeline
+ .getComponentsBuilder()
+ .putTransforms(
+ ptransformId,
+ currentTransform.toBuilder().clearSubtransforms().clearEnvironmentId().build());
}
}
return trimmedPipeline.build();
}
private static void removeDescendants(RunnerApi.Pipeline.Builder pipeline, String parentId) {
- RunnerApi.PTransform parentProto =
- pipeline.getComponents().getTransformsOrDefault(parentId, null);
- if (parentProto != null) {
- for (String childId : parentProto.getSubtransformsList()) {
- removeDescendants(pipeline, childId);
- pipeline.getComponentsBuilder().removeTransforms(childId);
- }
- pipeline
- .getComponentsBuilder()
- .putTransforms(parentId, parentProto.toBuilder().clearSubtransforms().build());
+ RunnerApi.PTransform parentProto = pipeline.getComponents().getTransformsOrThrow(parentId);
+ for (String childId : parentProto.getSubtransformsList()) {
+ removeDescendants(pipeline, childId);
+ pipeline.getComponentsBuilder().removeTransforms(childId);
}
}
}
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
index 3b041b4..ff797ef 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
@@ -50,9 +50,11 @@ import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
@@ -342,7 +344,7 @@ public class QueryablePipelineTest {
public void getEnvironmentWithEnvironment() {
Pipeline p = Pipeline.create();
PCollection<Long> longs = p.apply("BoundedRead", Read.from(CountingSource.upTo(100L)));
- PCollectionList.of(longs).and(longs).and(longs).apply("flatten", Flatten.pCollections());
+ longs.apply(WithKeys.of("a")).apply("groupByKey", GroupByKey.create());
Components components = PipelineTranslation.toProto(p).getComponents();
QueryablePipeline qp = QueryablePipeline.forPrimitivesIn(components);
@@ -350,7 +352,7 @@ public class QueryablePipelineTest {
PTransformNode environmentalRead =
PipelineNode.pTransform("BoundedRead", components.getTransformsOrThrow("BoundedRead"));
PTransformNode nonEnvironmentalTransform =
- PipelineNode.pTransform("flatten", components.getTransformsOrThrow("flatten"));
+ PipelineNode.pTransform("groupByKey", components.getTransformsOrThrow("groupByKey"));
assertThat(qp.getEnvironment(environmentalRead).isPresent(), is(true));
assertThat(
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index 0729f25..8945d9d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -30,9 +30,9 @@ import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
-import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
+import org.apache.beam.runners.core.construction.graph.TrivialNativeTransformExpander;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineJarUtils;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
@@ -98,7 +98,8 @@ public class FlinkPipelineRunner implements PortablePipelineRunner {
// Don't let the fuser fuse any subcomponents of native transforms.
Pipeline trimmedPipeline =
- PipelineTrimmer.trim(pipelineWithSdfExpanded, translator.knownUrns());
+ TrivialNativeTransformExpander.forKnownUrns(
+ pipelineWithSdfExpanded, translator.knownUrns());
// Fused pipeline proto.
// TODO: Consider supporting partially-fused graphs.
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 5494749..f041d0c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -228,7 +228,7 @@ public class FlinkStreamingPortablePipelineTranslator
@Override
public Set<String> knownUrns() {
- // Do not expose Read as a known URN because PipelineTrimmer otherwise removes
+ // Do not expose Read as a known URN because TrivialNativeTransformExpander otherwise removes
// the subtransforms which are added in case of bounded reads. We only have a
// translator here for unbounded Reads which are native transforms which do not
// have subtransforms. Unbounded Reads are used by cross-language transforms, e.g.
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index e5166a3..9727757 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -30,9 +30,9 @@ import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
-import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
+import org.apache.beam.runners.core.construction.graph.TrivialNativeTransformExpander;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineJarUtils;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
@@ -82,7 +82,8 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
// Don't let the fuser fuse any subcomponents of native transforms.
Pipeline trimmedPipeline =
- PipelineTrimmer.trim(pipelineWithSdfExpanded, translator.knownUrns());
+ TrivialNativeTransformExpander.forKnownUrns(
+ pipelineWithSdfExpanded, translator.knownUrns());
// Fused pipeline proto.
// TODO: Consider supporting partially-fused graphs.
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 9ff84c2..9441151 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -177,6 +177,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
transform := &pipepb.PTransform{
UniqueName: s.Scope.Name,
Subtransforms: subtransforms,
+ EnvironmentId: m.addDefaultEnv(),
}
m.updateIfCombineComposite(s, transform)
@@ -208,7 +209,6 @@ func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pipepb.PT
AccumulatorCoderId: acID,
}
transform.Spec = &pipepb.FunctionSpec{Urn: URNCombinePerKey, Payload: protox.MustEncode(payload)}
- transform.EnvironmentId = m.addDefaultEnv()
}
func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
@@ -238,10 +238,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
// allPIds tracks additional PTransformIDs generated for the pipeline
var allPIds []string
var spec *pipepb.FunctionSpec
- var transformEnvID = ""
switch edge.Edge.Op {
case graph.Impulse:
- // TODO(herohde) 7/18/2018: Encode data?
spec = &pipepb.FunctionSpec{Urn: URNImpulse}
case graph.ParDo:
@@ -315,7 +313,6 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
if edge.Edge.DoFn.IsSplittable() {
payload.RestrictionCoderId = m.coders.Add(edge.Edge.RestrictionCoder)
}
- transformEnvID = m.addDefaultEnv()
spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}
case graph.Combine:
@@ -325,7 +322,6 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
Payload: []byte(mustEncodeMultiEdgeBase64(edge.Edge)),
},
}
- transformEnvID = m.addDefaultEnv()
spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}
case graph.Flatten:
@@ -347,6 +343,11 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
panic(fmt.Sprintf("Unexpected opcode: %v", edge.Edge.Op))
}
+ var transformEnvID = ""
+ if !(spec.Urn == URNGBK || spec.Urn == URNImpulse) {
+ transformEnvID = m.addDefaultEnv()
+ }
+
transform := &pipepb.PTransform{
UniqueName: edge.Name,
Spec: spec,
@@ -413,10 +414,11 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
flattenID := fmt.Sprintf("%v_flatten", id)
flatten := &pipepb.PTransform{
- UniqueName: flattenID,
- Spec: &pipepb.FunctionSpec{Urn: URNFlatten},
- Inputs: inputs,
- Outputs: map[string]string{"i0": out},
+ UniqueName: flattenID,
+ Spec: &pipepb.FunctionSpec{Urn: URNFlatten},
+ Inputs: inputs,
+ Outputs: map[string]string{"i0": out},
+ EnvironmentId: m.addDefaultEnv(),
}
m.transforms[flattenID] = flatten
subtransforms = append(subtransforms, flattenID)
@@ -468,6 +470,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
m.transforms[cogbkID] = &pipepb.PTransform{
UniqueName: edge.Name,
Subtransforms: subtransforms,
+ EnvironmentId: m.addDefaultEnv(),
}
return cogbkID
}
@@ -632,6 +635,7 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string {
Spec: &pipepb.FunctionSpec{
Urn: URNReshuffle,
},
+ EnvironmentId: m.addDefaultEnv(),
}
return reshuffleID
}
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index bb55486..e6aef62 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -119,20 +119,16 @@ class Pipeline(object):
should be used to designate new names
(e.g. ``input | "label" >> my_tranform``).
"""
-
- # TODO: BEAM-9001 - set environment ID in all transforms and allow runners to
- # override.
@classmethod
- def sdk_transforms_with_environment(cls):
+ def runner_implemented_transforms(cls):
# type: () -> FrozenSet[str]
- from apache_beam.runners.portability.fn_api_runner import translations
- sets = [
- translations.PAR_DO_URNS,
- translations.COMBINE_URNS,
- frozenset([common_urns.primitives.ASSIGN_WINDOWS.urn])
- ]
- result = frozenset() # type: FrozenSet[str]
- return result.union(*sets)
+
+ # This set should only contain transforms which are required to be
+ # implemented by a runner.
+ return frozenset([
+ common_urns.primitives.GROUP_BY_KEY.urn,
+ common_urns.primitives.IMPULSE.urn,
+ ])
def __init__(self, runner=None, options=None, argv=None):
# type: (Optional[Union[str, PipelineRunner]], Optional[PipelineOptions], Optional[List[str]]) -> None
@@ -1117,8 +1113,8 @@ class AppliedPTransform(object):
transform_spec = transform_to_runner_api(self.transform, context)
environment_id = self.environment_id
transform_urn = transform_spec.urn if transform_spec else None
- if (not environment_id and transform_urn and
- (transform_urn in Pipeline.sdk_transforms_with_environment())):
+ if (not environment_id and
+ (transform_urn not in Pipeline.runner_implemented_transforms())):
environment_id = context.default_environment_id()
def _maybe_preserve_tag(new_tag, pc, input_tags_to_preserve):