You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/06/07 20:48:12 UTC
[1/3] beam git commit: [BEAM-2421] Swap to use an Impulse primitive +
DoFn for Create when executing with the Fn API.
Repository: beam
Updated Branches:
refs/heads/master 609016d70 -> caecac3b4
[BEAM-2421] Swap to use an Impulse primitive + DoFn for Create when executing with the Fn API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1cdb80cb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1cdb80cb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1cdb80cb
Branch: refs/heads/master
Commit: 1cdb80cb6319c04fa94961c14c038a5e15736d68
Parents: 5f7e73b
Author: Luke Cwik <lc...@google.com>
Authored: Wed Jun 7 08:53:14 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jun 7 13:41:20 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 158 +++++++++++++++++--
1 file changed, 145 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1cdb80cb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ed29330..3e7c8ce 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -49,6 +49,7 @@ import java.net.URLClassLoader;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -57,6 +58,7 @@ import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory;
import org.apache.beam.runners.core.construction.PTransformMatchers;
@@ -79,10 +81,12 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
@@ -103,6 +107,7 @@ import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.GroupedValues;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -113,6 +118,7 @@ import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.NameUtils;
@@ -312,6 +318,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
new StreamingPubsubIOWriteOverrideFactory(this)));
}
+ if (hasExperiment(options, "beam_fn_api")) {
+ overridesBuilder.add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Create.Values.class),
+ new StreamingFnApiCreateOverrideFactory()));
+ }
overridesBuilder
.add(
// Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and
@@ -428,15 +440,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
AppliedPTransform<PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> transform) {
PTransform<PInput, PCollection<T>> original = transform.getTransform();
- PCollection<T> output =
- (PCollection) Iterables.getOnlyElement(transform.getOutputs().values());
return PTransformReplacement.of(
transform.getPipeline().begin(),
InstanceBuilder.ofType(replacement)
.withArg(DataflowRunner.class, runner)
.withArg(
(Class<? super PTransform<PInput, PCollection<T>>>) original.getClass(), original)
- .withArg((Class<? super PCollection<T>>) output.getClass(), output)
.build());
}
@@ -814,10 +823,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
/** Builds an instance of this class from the overridden transform. */
@SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingPubsubIORead(
- DataflowRunner runner,
- PubsubUnboundedSource transform,
- PCollection<PubsubMessage> originalOutput) {
+ public StreamingPubsubIORead(DataflowRunner runner, PubsubUnboundedSource transform) {
this.transform = transform;
}
@@ -986,6 +992,136 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// ================================================================================
/**
+ * A PTranform override factory which maps Create.Values PTransforms for streaming pipelines
+ * into a Dataflow specific variant.
+ */
+ private static class StreamingFnApiCreateOverrideFactory<T>
+ implements PTransformOverrideFactory<PBegin, PCollection<T>, Create.Values<T>> {
+
+ @Override
+ public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
+ AppliedPTransform<PBegin, PCollection<T>, Create.Values<T>> transform) {
+ Create.Values<T> original = transform.getTransform();
+ PCollection<T> output =
+ (PCollection) Iterables.getOnlyElement(transform.getOutputs().values());
+ return PTransformReplacement.of(
+ transform.getPipeline().begin(),
+ new StreamingFnApiCreate<>(original, output));
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.Create.Values Create.Values} for the Dataflow runner in
+ * streaming mode over the Fn API.
+ */
+ private static class StreamingFnApiCreate<T> extends PTransform<PBegin, PCollection<T>> {
+ private final Create.Values<T> transform;
+ private final PCollection<T> originalOutput;
+
+ private StreamingFnApiCreate(
+ Create.Values<T> transform,
+ PCollection<T> originalOutput) {
+ this.transform = transform;
+ this.originalOutput = originalOutput;
+ }
+
+ @Override
+ public final PCollection<T> expand(PBegin input) {
+ try {
+ PCollection<T> pc = Pipeline
+ .applyTransform(input, new Impulse(IsBounded.BOUNDED))
+ .apply(ParDo.of(DecodeAndEmitDoFn
+ .fromIterable(transform.getElements(), originalOutput.getCoder())));
+ pc.setCoder(originalOutput.getCoder());
+ return pc;
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to encode elements.", e);
+ }
+ }
+
+ /**
+ * A DoFn which stores encoded versions of elements and a representation of a Coder
+ * capable of decoding those elements.
+ *
+ * <p>TODO: BEAM-2422 - Make this a SplittableDoFn.
+ */
+ private static class DecodeAndEmitDoFn<T> extends DoFn<byte[], T> {
+ public static <T> DecodeAndEmitDoFn<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder)
+ throws IOException {
+ ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder();
+ for (T element : elements) {
+ byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
+ allElementsBytes.add(bytes);
+ }
+ return new DecodeAndEmitDoFn<>(allElementsBytes.build(), elemCoder);
+ }
+
+ private final Collection<byte[]> elements;
+ private final RunnerApi.MessageWithComponents coderSpec;
+
+ private DecodeAndEmitDoFn(Collection<byte[]> elements, Coder<T> coder) throws IOException {
+ this.elements = elements;
+ this.coderSpec = CoderTranslation.toProto(coder);
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) throws IOException {
+ Coder<T> coder =
+ (Coder) CoderTranslation.fromProto(coderSpec.getCoder(), coderSpec.getComponents());
+ for (byte[] element : elements) {
+ context.output(CoderUtils.decodeFromByteArray(coder, element));
+ }
+ }
+ }
+ }
+
+ /** The Dataflow specific override for the impulse primitive. */
+ private static class Impulse extends PTransform<PBegin, PCollection<byte[]>> {
+ private final IsBounded isBounded;
+
+ private Impulse(IsBounded isBounded) {
+ this.isBounded = isBounded;
+ }
+
+ @Override
+ public PCollection<byte[]> expand(PBegin input) {
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), WindowingStrategy.globalDefault(), isBounded);
+ }
+
+ @Override
+ protected Coder<?> getDefaultOutputCoder() {
+ return ByteArrayCoder.of();
+ }
+
+ private static class Translator implements TransformTranslator<Impulse> {
+ @Override
+ public void translate(Impulse transform, TranslationContext context) {
+ if (context.getPipelineOptions().isStreaming()) {
+ StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
+ stepContext.addInput(PropertyNames.FORMAT, "pubsub");
+ stepContext.addInput(PropertyNames.PUBSUB_SUBSCRIPTION, "_starting_signal/");
+ stepContext.addOutput(context.getOutput(transform));
+ } else {
+ throw new UnsupportedOperationException(
+ "Impulse source for batch pipelines has not been defined.");
+ }
+ }
+ }
+
+ static {
+ DataflowPipelineTranslator.registerTransformTranslator(Impulse.class, new Translator());
+ }
+ }
+
+ /**
* Specialized implementation for
* {@link org.apache.beam.sdk.io.Read.Unbounded Read.Unbounded} for the
* Dataflow runner in streaming mode.
@@ -998,9 +1134,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
/** Builds an instance of this class from the overridden transform. */
@SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingUnboundedRead(DataflowRunner runner,
- Read.Unbounded<T> transform,
- PCollection<T> originalOutput) {
+ public StreamingUnboundedRead(DataflowRunner runner, Read.Unbounded<T> transform) {
this.source = transform.getSource();
}
@@ -1115,9 +1249,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
/** Builds an instance of this class from the overridden transform. */
@SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingBoundedRead(DataflowRunner runner,
- Read.Bounded<T> transform,
- PCollection<T> originalOutput) {
+ public StreamingBoundedRead(DataflowRunner runner, Read.Bounded<T> transform) {
this.source = transform.getSource();
}
[3/3] beam git commit: [BEAM-2421] Swap to use an Impulse primitive +
DoFn for Create when executing with the Fn API.
Posted by lc...@apache.org.
[BEAM-2421] Swap to use an Impulse primitive + DoFn for Create when executing with the Fn API.
This closes #3312
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/caecac3b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/caecac3b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/caecac3b
Branch: refs/heads/master
Commit: caecac3b4acb5bfa6e36143d3868b2d80ab119da
Parents: 609016d 1cdb80c
Author: Luke Cwik <lc...@google.com>
Authored: Wed Jun 7 13:43:38 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jun 7 13:43:38 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 154 ++++++++++++++++++-
1 file changed, 146 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: This closes #2286
Posted by lc...@apache.org.
This closes #2286
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5f7e73bb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5f7e73bb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5f7e73bb
Branch: refs/heads/master
Commit: 5f7e73bbacf7096eed44002a54910a560b195801
Parents: b2de3db 8f4fa43
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Jun 7 19:43:19 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jun 7 13:41:20 2017 -0700
----------------------------------------------------------------------
.../translation/types/CoderTypeSerializer.java | 41 ++-
.../streaming/io/UnboundedSourceWrapper.java | 2 +
.../flink/streaming/TestCountingSource.java | 48 ++-
.../streaming/UnboundedSourceWrapperTest.java | 309 +++++++++++--------
.../beam/runners/dataflow/DataflowRunner.java | 24 +-
5 files changed, 269 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5f7e73bb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --cc runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index cce6ce7,cce6ce7..ed29330
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@@ -428,12 -428,12 +428,15 @@@ public class DataflowRunner extends Pip
public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
AppliedPTransform<PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> transform) {
PTransform<PInput, PCollection<T>> original = transform.getTransform();
++ PCollection<T> output =
++ (PCollection) Iterables.getOnlyElement(transform.getOutputs().values());
return PTransformReplacement.of(
transform.getPipeline().begin(),
InstanceBuilder.ofType(replacement)
.withArg(DataflowRunner.class, runner)
.withArg(
(Class<? super PTransform<PInput, PCollection<T>>>) original.getClass(), original)
++ .withArg((Class<? super PCollection<T>>) output.getClass(), output)
.build());
}
@@@ -809,11 -809,11 +812,12 @@@
extends PTransform<PBegin, PCollection<PubsubMessage>> {
private final PubsubUnboundedSource transform;
-- /**
-- * Builds an instance of this class from the overridden transform.
-- */
++ /** Builds an instance of this class from the overridden transform. */
++ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
public StreamingPubsubIORead(
-- DataflowRunner runner, PubsubUnboundedSource transform) {
++ DataflowRunner runner,
++ PubsubUnboundedSource transform,
++ PCollection<PubsubMessage> originalOutput) {
this.transform = transform;
}
@@@ -992,11 -992,11 +996,11 @@@
private static class StreamingUnboundedRead<T> extends PTransform<PBegin, PCollection<T>> {
private final UnboundedSource<T, ?> source;
-- /**
-- * Builds an instance of this class from the overridden transform.
-- */
++ /** Builds an instance of this class from the overridden transform. */
@SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-- public StreamingUnboundedRead(DataflowRunner runner, Read.Unbounded<T> transform) {
++ public StreamingUnboundedRead(DataflowRunner runner,
++ Read.Unbounded<T> transform,
++ PCollection<T> originalOutput) {
this.source = transform.getSource();
}
@@@ -1111,7 -1111,7 +1115,9 @@@
/** Builds an instance of this class from the overridden transform. */
@SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-- public StreamingBoundedRead(DataflowRunner runner, Read.Bounded<T> transform) {
++ public StreamingBoundedRead(DataflowRunner runner,
++ Read.Bounded<T> transform,
++ PCollection<T> originalOutput) {
this.source = transform.getSource();
}