You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/04 17:09:04 UTC
[2/2] beam git commit: Use Batch Replacement API in the Dataflow
Runner
Use Batch Replacement API in the Dataflow Runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7efee543
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7efee543
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7efee543
Branch: refs/heads/master
Commit: 7efee543abc6ec9c74578f454c4963912356ab1d
Parents: db3144b
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 23 09:38:53 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 4 10:08:53 2017 -0700
----------------------------------------------------------------------
.../dataflow/DataflowPipelineTranslator.java | 5 +-
.../beam/runners/dataflow/DataflowRunner.java | 169 +++++++++++--------
.../main/java/org/apache/beam/sdk/Pipeline.java | 3 +-
3 files changed, 102 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7efee543/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 9b80756..ab9df70 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -408,7 +408,10 @@ public class DataflowPipelineTranslator {
PTransform<?, ?> transform = node.getTransform();
TransformTranslator translator = getTransformTranslator(transform.getClass());
checkState(
- translator != null, "no translator registered for primitive transform %s", transform);
+ translator != null,
+ "no translator registered for primitive transform %s at node %s",
+ transform,
+ node.getFullName());
LOG.debug("Translating {}", transform);
currentTransform = node.toAppliedPTransform();
translator.translate(transform, this);
http://git-wip-us.apache.org/repos/asf/beam/blob/7efee543/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 6eec8f8..a2b715f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -37,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.base.Utf8;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -64,6 +65,7 @@ import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.core.construction.UnsupportedOverrideFactory;
+import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
@@ -91,7 +93,6 @@ import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
@@ -289,96 +290,123 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
}
- private Map<PTransformMatcher, PTransformOverrideFactory> getOverrides(boolean streaming) {
- ImmutableMap.Builder<PTransformMatcher, PTransformOverrideFactory> ptoverrides =
- ImmutableMap.builder();
+ private List<PTransformOverride> getOverrides(boolean streaming) {
+ ImmutableList.Builder<PTransformOverride> overridesBuilder = ImmutableList.builder();
// Create is implemented in terms of a Read, so it must precede the override to Read in
// streaming
- ptoverrides
- .put(PTransformMatchers.flattenWithDuplicateInputs(), DeduplicatedFlattenFactory.create())
- .put(PTransformMatchers.emptyFlatten(), EmptyFlattenAsCreateFactory.instance());
+ overridesBuilder
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.flattenWithDuplicateInputs(),
+ DeduplicatedFlattenFactory.create()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.emptyFlatten(), EmptyFlattenAsCreateFactory.instance()));
if (streaming) {
// In streaming mode must use either the custom Pubsub unbounded source/sink or
// defer to Windmill's built-in implementation.
for (Class<? extends DoFn> unsupported :
ImmutableSet.of(PubsubBoundedReader.class, PubsubBoundedWriter.class)) {
- ptoverrides.put(
- PTransformMatchers.parDoWithFnType(unsupported),
- UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, true)));
+ overridesBuilder.add(
+ PTransformOverride.of(
+ PTransformMatchers.parDoWithFnType(unsupported),
+ UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, true))));
}
if (!hasExperiment(options, "enable_custom_pubsub_source")) {
- ptoverrides.put(
- PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
- new ReflectiveRootOverrideFactory(StreamingPubsubIORead.class, this));
+ overridesBuilder.add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
+ new ReflectiveRootOverrideFactory(StreamingPubsubIORead.class, this)));
}
if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
- ptoverrides.put(
- PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
- new StreamingPubsubIOWriteOverrideFactory(this));
+ overridesBuilder.add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
+ new StreamingPubsubIOWriteOverrideFactory(this)));
}
- ptoverrides
- .put(
+ overridesBuilder
+ .add(
// Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and
// must precede it
- PTransformMatchers.classEqualTo(Read.Bounded.class),
- new ReflectiveRootOverrideFactory(StreamingBoundedRead.class, this))
- .put(
- PTransformMatchers.classEqualTo(Read.Unbounded.class),
- new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this))
- .put(
- PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
- new StreamingCreatePCollectionViewFactory());
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Read.Bounded.class),
+ new ReflectiveRootOverrideFactory(StreamingBoundedRead.class, this)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Read.Unbounded.class),
+ new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+ new StreamingCreatePCollectionViewFactory()));
} else {
// In batch mode must use the custom Pubsub bounded source/sink.
for (Class<? extends PTransform> unsupported :
ImmutableSet.of(PubsubUnboundedSink.class, PubsubUnboundedSource.class)) {
- ptoverrides.put(
- PTransformMatchers.classEqualTo(unsupported),
- UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, false)));
+ overridesBuilder.add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(unsupported),
+ UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, false))));
}
- ptoverrides
+ overridesBuilder
// State and timer pardos are implemented by expansion to GBK-then-ParDo
- .put(
- PTransformMatchers.stateOrTimerParDoMulti(),
- BatchStatefulParDoOverrides.multiOutputOverrideFactory())
- .put(
- PTransformMatchers.stateOrTimerParDoSingle(),
- BatchStatefulParDoOverrides.singleOutputOverrideFactory())
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.stateOrTimerParDoMulti(),
+ BatchStatefulParDoOverrides.multiOutputOverrideFactory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.stateOrTimerParDoSingle(),
+ BatchStatefulParDoOverrides.singleOutputOverrideFactory()))
// Write uses views internally
- .put(PTransformMatchers.classEqualTo(Write.class), new BatchWriteFactory(this))
- .put(
- PTransformMatchers.classEqualTo(View.AsMap.class),
- new ReflectiveOneToOneOverrideFactory(BatchViewOverrides.BatchViewAsMap.class, this))
- .put(
- PTransformMatchers.classEqualTo(View.AsMultimap.class),
- new ReflectiveOneToOneOverrideFactory(
- BatchViewOverrides.BatchViewAsMultimap.class, this))
- .put(
- PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
- new BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory(this))
- .put(
- PTransformMatchers.classEqualTo(View.AsSingleton.class),
- new ReflectiveOneToOneOverrideFactory(
- BatchViewOverrides.BatchViewAsSingleton.class, this))
- .put(
- PTransformMatchers.classEqualTo(View.AsList.class),
- new ReflectiveOneToOneOverrideFactory(BatchViewOverrides.BatchViewAsList.class, this))
- .put(
- PTransformMatchers.classEqualTo(View.AsIterable.class),
- new ReflectiveOneToOneOverrideFactory(
- BatchViewOverrides.BatchViewAsIterable.class, this));
- }
- ptoverrides
- .put(PTransformMatchers.classEqualTo(Reshuffle.class), new ReshuffleOverrideFactory())
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Write.class), new BatchWriteFactory(this)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+ new BatchCombineGloballyAsSingletonViewFactory(this)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsMap.class),
+ new ReflectiveOneToOneOverrideFactory(
+ BatchViewOverrides.BatchViewAsMap.class, this)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsMultimap.class),
+ new ReflectiveOneToOneOverrideFactory(
+ BatchViewOverrides.BatchViewAsMultimap.class, this)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsSingleton.class),
+ new ReflectiveOneToOneOverrideFactory(
+ BatchViewOverrides.BatchViewAsSingleton.class, this)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsList.class),
+ new ReflectiveOneToOneOverrideFactory(
+ BatchViewOverrides.BatchViewAsList.class, this)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsIterable.class),
+ new ReflectiveOneToOneOverrideFactory(
+ BatchViewOverrides.BatchViewAsIterable.class, this)));
+ }
+ overridesBuilder
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Reshuffle.class), new ReshuffleOverrideFactory()))
// Order is important. Streaming views almost all use Combine internally.
- .put(
- PTransformMatchers.classEqualTo(Combine.GroupedValues.class),
- new PrimitiveCombineGroupedValuesOverrideFactory())
- .put(
- PTransformMatchers.classEqualTo(ParDo.SingleOutput.class),
- new PrimitiveParDoSingleFactory());
- return ptoverrides.build();
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Combine.GroupedValues.class),
+ new PrimitiveCombineGroupedValuesOverrideFactory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(ParDo.SingleOutput.class),
+ new PrimitiveParDoSingleFactory()));
+ return overridesBuilder.build();
}
private String getUnsupportedMessage(Class<?> unsupported, boolean streaming) {
@@ -682,10 +710,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@VisibleForTesting
void replaceTransforms(Pipeline pipeline) {
boolean streaming = options.isStreaming() || containsUnboundedPCollection(pipeline);
- for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
- getOverrides(streaming).entrySet()) {
- pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
- }
+ pipeline.replaceAll(getOverrides(streaming));
}
private boolean containsUnboundedPCollection(Pipeline p) {
http://git-wip-us.apache.org/repos/asf/beam/blob/7efee543/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index f980a0b..fa8277f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -228,8 +228,7 @@ public class Pipeline {
});
}
- public void replace(
- final PTransformOverride override) {
+ public void replace(final PTransformOverride override) {
final Collection<Node> matches = new ArrayList<>();
transforms.visit(
new PipelineVisitor.Defaults() {