You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/26 17:49:23 UTC
[beam] branch master updated: [BEAM-9340] Validate pipeline
requirements in PipelineValidator.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 342fcdb [BEAM-9340] Validate pipeline requirements in PipelineValidator.
new e8cb91f Merge pull request #11224 from robertwb/java-validate-requirements
342fcdb is described below
commit 342fcdb8b0fbaae8de286968853abd34e7d862ff
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Wed Mar 25 11:34:29 2020 -0700
[BEAM-9340] Validate pipeline requirements in PipelineValidator.
Also fix a bug where the greedy fuser was not propagating requirements.
---
.../core/construction/graph/FusedPipeline.java | 9 +++-
.../construction/graph/GreedyPipelineFuser.java | 13 ++++--
.../core/construction/graph/PipelineValidator.java | 51 ++++++++++++++++------
.../graph/GreedyPipelineFuserTest.java | 19 ++++++--
4 files changed, 71 insertions(+), 21 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
index a67c36d..f9af790 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
@@ -38,8 +38,9 @@ public abstract class FusedPipeline {
static FusedPipeline of(
Components components,
Set<ExecutableStage> environmentalStages,
- Set<PTransformNode> runnerStages) {
- return new AutoValue_FusedPipeline(components, environmentalStages, runnerStages);
+ Set<PTransformNode> runnerStages,
+ Set<String> requirements) {
+ return new AutoValue_FusedPipeline(components, environmentalStages, runnerStages, requirements);
}
abstract Components getComponents();
@@ -50,6 +51,9 @@ public abstract class FusedPipeline {
/** The {@link PTransform PTransforms} that a runner is responsible for executing. */
public abstract Set<PTransformNode> getRunnerExecutedTransforms();
+ /** The {@link PTransform PTransforms} that a runner is responsible for executing. */
+ public abstract Set<String> getRequirements();
+
/**
* Returns the {@link RunnerApi.Pipeline} representation of this {@link FusedPipeline}.
*
@@ -84,6 +88,7 @@ public abstract class FusedPipeline {
Pipeline.newBuilder()
.setComponents(fusedComponents)
.addAllRootTransformIds(rootTransformIds)
+ .addAllRequirements(getRequirements())
.build();
// Validate that fusion didn't produce a malformed pipeline.
PipelineValidator.validate(res);
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
index 6184498..dbfe6c6 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
@@ -46,6 +46,7 @@ import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNo
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.slf4j.Logger;
@@ -72,7 +73,11 @@ public class GreedyPipelineFuser {
unfusedRootNodes.addAll(descendants.getUnfusedNodes());
rootConsumers.addAll(descendants.getFusibleConsumers());
}
- this.fusedPipeline = fusePipeline(unfusedRootNodes, groupSiblings(rootConsumers));
+ this.fusedPipeline =
+ fusePipeline(
+ unfusedRootNodes,
+ groupSiblings(rootConsumers),
+ ImmutableSet.copyOf(p.getRequirementsList()));
}
/**
@@ -114,7 +119,8 @@ public class GreedyPipelineFuser {
*/
private FusedPipeline fusePipeline(
Collection<PTransformNode> initialUnfusedTransforms,
- NavigableSet<NavigableSet<CollectionConsumer>> initialConsumers) {
+ NavigableSet<NavigableSet<CollectionConsumer>> initialConsumers,
+ Set<String> requirements) {
Map<CollectionConsumer, ExecutableStage> consumedCollectionsAndTransforms = new HashMap<>();
Set<ExecutableStage> stages = new LinkedHashSet<>();
Set<PTransformNode> unfusedTransforms = new LinkedHashSet<>(initialUnfusedTransforms);
@@ -174,7 +180,8 @@ public class GreedyPipelineFuser {
deduplicated
.getDeduplicatedTransforms()
.getOrDefault(transform.getId(), transform))
- .collect(Collectors.toSet())));
+ .collect(Collectors.toSet())),
+ requirements);
}
private DescendantConsumers getRootConsumers(PTransformNode rootNode) {
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
index c0ffb5c..c80947b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction.graph;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
+import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
@@ -31,7 +32,9 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.TestStreamPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
/**
@@ -42,7 +45,9 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
public class PipelineValidator {
@FunctionalInterface
private interface TransformValidator {
- void validate(String transformId, PTransform transform, Components components) throws Exception;
+ void validate(
+ String transformId, PTransform transform, Components components, Set<String> requirements)
+ throws Exception;
}
private static final ImmutableMap<String, TransformValidator> VALIDATORS =
@@ -95,10 +100,11 @@ public class PipelineValidator {
transformId);
}
- validateComponents("pipeline", components);
+ validateComponents("pipeline", components, ImmutableSet.copyOf(p.getRequirementsList()));
}
- private static void validateComponents(String context, Components components) {
+ private static void validateComponents(
+ String context, Components components, Set<String> requirements) {
{
Map<String, String> uniqueNamesById = Maps.newHashMap();
for (String transformId : components.getTransformsMap().keySet()) {
@@ -114,7 +120,7 @@ public class PipelineValidator {
transformId,
previousId,
transform.getUniqueName());
- validateTransform(transformId, transform, components);
+ validateTransform(transformId, transform, components, requirements);
}
}
{
@@ -172,7 +178,8 @@ public class PipelineValidator {
}
}
- private static void validateTransform(String id, PTransform transform, Components components) {
+ private static void validateTransform(
+ String id, PTransform transform, Components components, Set<String> requirements) {
for (String subtransformId : transform.getSubtransformsList()) {
checkArgument(
components.containsTransforms(subtransformId),
@@ -203,14 +210,15 @@ public class PipelineValidator {
String urn = transform.getSpec().getUrn();
if (VALIDATORS.containsKey(urn)) {
try {
- VALIDATORS.get(urn).validate(id, transform, components);
+ VALIDATORS.get(urn).validate(id, transform, components, requirements);
} catch (Exception e) {
throw new RuntimeException(String.format("Failed to validate transform %s", id), e);
}
}
}
- private static void validateParDo(String id, PTransform transform, Components components)
+ private static void validateParDo(
+ String id, PTransform transform, Components components, Set<String> requirements)
throws Exception {
ParDoPayload payload = ParDoPayload.parseFrom(transform.getSpec().getPayload());
// side_inputs
@@ -221,23 +229,39 @@ public class PipelineValidator {
id,
sideInputId);
}
- // TODO: Validate state_specs and timer_specs
+ if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) {
+ checkArgument(requirements.contains(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN));
+ // TODO: Validate state_specs and timer_specs
+ }
if (!payload.getRestrictionCoderId().isEmpty()) {
checkArgument(components.containsCoders(payload.getRestrictionCoderId()));
+ checkArgument(requirements.contains(ParDoTranslation.REQUIRES_SPLITTABLE_DOFN_URN));
+ }
+ if (payload.getRequestsFinalization()) {
+ checkArgument(requirements.contains(ParDoTranslation.REQUIRES_BUNDLE_FINALIZATION_URN));
+ }
+ if (payload.getRequiresStableInput()) {
+ checkArgument(requirements.contains(ParDoTranslation.REQUIRES_STABLE_INPUT_URN));
+ }
+ if (payload.getRequiresTimeSortedInput()) {
+ checkArgument(requirements.contains(ParDoTranslation.REQUIRES_TIME_SORTED_INPUT_URN));
}
}
- private static void validateAssignWindows(String id, PTransform transform, Components components)
+ private static void validateAssignWindows(
+ String id, PTransform transform, Components components, Set<String> requirements)
throws Exception {
WindowIntoPayload.parseFrom(transform.getSpec().getPayload());
}
- private static void validateTestStream(String id, PTransform transform, Components components)
+ private static void validateTestStream(
+ String id, PTransform transform, Components components, Set<String> requirements)
throws Exception {
TestStreamPayload.parseFrom(transform.getSpec().getPayload());
}
- private static void validateCombine(String id, PTransform transform, Components components)
+ private static void validateCombine(
+ String id, PTransform transform, Components components, Set<String> requirements)
throws Exception {
CombinePayload payload = CombinePayload.parseFrom(transform.getSpec().getPayload());
checkArgument(
@@ -247,7 +271,8 @@ public class PipelineValidator {
}
private static void validateExecutableStage(
- String id, PTransform transform, Components outerComponents) throws Exception {
+ String id, PTransform transform, Components outerComponents, Set<String> requirements)
+ throws Exception {
ExecutableStagePayload payload =
ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
@@ -278,7 +303,7 @@ public class PipelineValidator {
outputId);
}
- validateComponents("ExecutableStage " + id, components);
+ validateComponents("ExecutableStage " + id, components, requirements);
// TODO: Also validate that side inputs of all transforms within components.getTransforms()
// are contained within payload.getSideInputsList()
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
index 0a32f9e..97b8091 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
@@ -48,6 +48,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
import org.hamcrest.Matchers;
@@ -980,7 +981,11 @@ public class GreedyPipelineFuserTest {
.putEnvironments("common", Environments.createDockerEnvironment("common"))
.build();
FusedPipeline fused =
- GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build());
+ GreedyPipelineFuser.fuse(
+ Pipeline.newBuilder()
+ .setComponents(components)
+ .addRequirements(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN)
+ .build());
assertThat(
fused.getRunnerExecutedTransforms(),
@@ -1054,7 +1059,11 @@ public class GreedyPipelineFuserTest {
.build();
FusedPipeline fused =
- GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build());
+ GreedyPipelineFuser.fuse(
+ Pipeline.newBuilder()
+ .setComponents(components)
+ .addRequirements(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN)
+ .build());
assertThat(
fused.getRunnerExecutedTransforms(),
@@ -1107,7 +1116,11 @@ public class GreedyPipelineFuserTest {
.build();
FusedPipeline fused =
- GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build());
+ GreedyPipelineFuser.fuse(
+ Pipeline.newBuilder()
+ .setComponents(components)
+ .addRequirements(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN)
+ .build());
assertThat(
fused.getRunnerExecutedTransforms(),