You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/12 16:55:18 UTC

[03/50] [abbrv] beam git commit: [BEAM-2421] Swap to use an Impulse primitive + DoFn for Create when executing with the Fn API.

[BEAM-2421] Swap to use an Impulse primitive + DoFn for Create when executing with the Fn API.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1cdb80cb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1cdb80cb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1cdb80cb

Branch: refs/heads/gearpump-runner
Commit: 1cdb80cb6319c04fa94961c14c038a5e15736d68
Parents: 5f7e73b
Author: Luke Cwik <lc...@google.com>
Authored: Wed Jun 7 08:53:14 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jun 7 13:41:20 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 158 +++++++++++++++++--
 1 file changed, 145 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1cdb80cb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ed29330..3e7c8ce 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -49,6 +49,7 @@ import java.net.URLClassLoader;
 import java.nio.channels.Channels;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -57,6 +58,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
 import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
@@ -79,10 +81,12 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileSystems;
@@ -103,6 +107,7 @@ import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.GroupedValues;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -113,6 +118,7 @@ import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.NameUtils;
@@ -312,6 +318,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
                 PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
                 new StreamingPubsubIOWriteOverrideFactory(this)));
       }
+      if (hasExperiment(options, "beam_fn_api")) {
+        overridesBuilder.add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(Create.Values.class),
+                new StreamingFnApiCreateOverrideFactory()));
+      }
       overridesBuilder
           .add(
               // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and
@@ -428,15 +440,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
         AppliedPTransform<PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> transform) {
       PTransform<PInput, PCollection<T>> original = transform.getTransform();
-      PCollection<T> output =
-          (PCollection) Iterables.getOnlyElement(transform.getOutputs().values());
       return PTransformReplacement.of(
           transform.getPipeline().begin(),
           InstanceBuilder.ofType(replacement)
               .withArg(DataflowRunner.class, runner)
               .withArg(
                   (Class<? super PTransform<PInput, PCollection<T>>>) original.getClass(), original)
-              .withArg((Class<? super PCollection<T>>) output.getClass(), output)
               .build());
     }
 
@@ -814,10 +823,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     /** Builds an instance of this class from the overridden transform. */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingPubsubIORead(
-        DataflowRunner runner,
-        PubsubUnboundedSource transform,
-        PCollection<PubsubMessage> originalOutput) {
+    public StreamingPubsubIORead(DataflowRunner runner, PubsubUnboundedSource transform) {
       this.transform = transform;
     }
 
@@ -986,6 +992,136 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   // ================================================================================
 
   /**
+   * A PTranform override factory which maps Create.Values PTransforms for streaming pipelines
+   * into a Dataflow specific variant.
+   */
+  private static class StreamingFnApiCreateOverrideFactory<T>
+      implements PTransformOverrideFactory<PBegin, PCollection<T>, Create.Values<T>> {
+
+    @Override
+    public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<T>, Create.Values<T>> transform) {
+      Create.Values<T> original = transform.getTransform();
+      PCollection<T> output =
+          (PCollection) Iterables.getOnlyElement(transform.getOutputs().values());
+      return PTransformReplacement.of(
+          transform.getPipeline().begin(),
+          new StreamingFnApiCreate<>(original, output));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.Create.Values Create.Values} for the Dataflow runner in
+   * streaming mode over the Fn API.
+   */
+  private static class StreamingFnApiCreate<T> extends PTransform<PBegin, PCollection<T>> {
+    private final Create.Values<T> transform;
+    private final PCollection<T> originalOutput;
+
+    private StreamingFnApiCreate(
+        Create.Values<T> transform,
+        PCollection<T> originalOutput) {
+      this.transform = transform;
+      this.originalOutput = originalOutput;
+    }
+
+    @Override
+    public final PCollection<T> expand(PBegin input) {
+      try {
+        PCollection<T> pc = Pipeline
+            .applyTransform(input, new Impulse(IsBounded.BOUNDED))
+            .apply(ParDo.of(DecodeAndEmitDoFn
+                .fromIterable(transform.getElements(), originalOutput.getCoder())));
+        pc.setCoder(originalOutput.getCoder());
+        return pc;
+      } catch (IOException e) {
+        throw new IllegalStateException("Unable to encode elements.", e);
+      }
+    }
+
+    /**
+     * A DoFn which stores encoded versions of elements and a representation of a Coder
+     * capable of decoding those elements.
+     *
+     * <p>TODO: BEAM-2422 - Make this a SplittableDoFn.
+     */
+    private static class DecodeAndEmitDoFn<T> extends DoFn<byte[], T> {
+      public static <T> DecodeAndEmitDoFn<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder)
+          throws IOException {
+        ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder();
+        for (T element : elements) {
+          byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
+          allElementsBytes.add(bytes);
+        }
+        return new DecodeAndEmitDoFn<>(allElementsBytes.build(), elemCoder);
+      }
+
+      private final Collection<byte[]> elements;
+      private final RunnerApi.MessageWithComponents coderSpec;
+
+      private DecodeAndEmitDoFn(Collection<byte[]> elements, Coder<T> coder) throws IOException {
+        this.elements = elements;
+        this.coderSpec = CoderTranslation.toProto(coder);
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) throws IOException {
+        Coder<T> coder =
+            (Coder) CoderTranslation.fromProto(coderSpec.getCoder(), coderSpec.getComponents());
+        for (byte[] element : elements) {
+          context.output(CoderUtils.decodeFromByteArray(coder, element));
+        }
+      }
+    }
+  }
+
+  /** The Dataflow specific override for the impulse primitive. */
+  private static class Impulse extends PTransform<PBegin, PCollection<byte[]>> {
+    private final IsBounded isBounded;
+
+    private Impulse(IsBounded isBounded) {
+      this.isBounded = isBounded;
+    }
+
+    @Override
+    public PCollection<byte[]> expand(PBegin input) {
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), WindowingStrategy.globalDefault(), isBounded);
+    }
+
+    @Override
+    protected Coder<?> getDefaultOutputCoder() {
+      return ByteArrayCoder.of();
+    }
+
+    private static class Translator implements TransformTranslator<Impulse> {
+      @Override
+      public void translate(Impulse transform, TranslationContext context) {
+        if (context.getPipelineOptions().isStreaming()) {
+          StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
+          stepContext.addInput(PropertyNames.FORMAT, "pubsub");
+          stepContext.addInput(PropertyNames.PUBSUB_SUBSCRIPTION, "_starting_signal/");
+          stepContext.addOutput(context.getOutput(transform));
+        } else {
+          throw new UnsupportedOperationException(
+              "Impulse source for batch pipelines has not been defined.");
+        }
+      }
+    }
+
+    static {
+      DataflowPipelineTranslator.registerTransformTranslator(Impulse.class, new Translator());
+    }
+  }
+
+  /**
    * Specialized implementation for
    * {@link org.apache.beam.sdk.io.Read.Unbounded Read.Unbounded} for the
    * Dataflow runner in streaming mode.
@@ -998,9 +1134,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     /** Builds an instance of this class from the overridden transform. */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingUnboundedRead(DataflowRunner runner,
-        Read.Unbounded<T> transform,
-        PCollection<T> originalOutput) {
+    public StreamingUnboundedRead(DataflowRunner runner, Read.Unbounded<T> transform) {
       this.source = transform.getSource();
     }
 
@@ -1115,9 +1249,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     /** Builds an instance of this class from the overridden transform. */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingBoundedRead(DataflowRunner runner,
-        Read.Bounded<T> transform,
-        PCollection<T> originalOutput) {
+    public StreamingBoundedRead(DataflowRunner runner, Read.Bounded<T> transform) {
       this.source = transform.getSource();
     }