You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/06/07 20:48:13 UTC

[2/3] beam git commit: This closes #2286

This closes #2286


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5f7e73bb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5f7e73bb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5f7e73bb

Branch: refs/heads/master
Commit: 5f7e73bbacf7096eed44002a54910a560b195801
Parents: b2de3db 8f4fa43
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Jun 7 19:43:19 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jun 7 13:41:20 2017 -0700

----------------------------------------------------------------------
 .../translation/types/CoderTypeSerializer.java  |  41 ++-
 .../streaming/io/UnboundedSourceWrapper.java    |   2 +
 .../flink/streaming/TestCountingSource.java     |  48 ++-
 .../streaming/UnboundedSourceWrapperTest.java   | 309 +++++++++++--------
 .../beam/runners/dataflow/DataflowRunner.java   |  24 +-
 5 files changed, 269 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5f7e73bb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --cc runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index cce6ce7,cce6ce7..ed29330
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@@ -428,12 -428,12 +428,15 @@@ public class DataflowRunner extends Pip
      public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
          AppliedPTransform<PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> transform) {
        PTransform<PInput, PCollection<T>> original = transform.getTransform();
++      PCollection<T> output =
++          (PCollection) Iterables.getOnlyElement(transform.getOutputs().values());
        return PTransformReplacement.of(
            transform.getPipeline().begin(),
            InstanceBuilder.ofType(replacement)
                .withArg(DataflowRunner.class, runner)
                .withArg(
                    (Class<? super PTransform<PInput, PCollection<T>>>) original.getClass(), original)
++              .withArg((Class<? super PCollection<T>>) output.getClass(), output)
                .build());
      }
  
@@@ -809,11 -809,11 +812,12 @@@
        extends PTransform<PBegin, PCollection<PubsubMessage>> {
      private final PubsubUnboundedSource transform;
  
--    /**
--     * Builds an instance of this class from the overridden transform.
--     */
++    /** Builds an instance of this class from the overridden transform. */
++    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
      public StreamingPubsubIORead(
--        DataflowRunner runner, PubsubUnboundedSource transform) {
++        DataflowRunner runner,
++        PubsubUnboundedSource transform,
++        PCollection<PubsubMessage> originalOutput) {
        this.transform = transform;
      }
  
@@@ -992,11 -992,11 +996,11 @@@
    private static class StreamingUnboundedRead<T> extends PTransform<PBegin, PCollection<T>> {
      private final UnboundedSource<T, ?> source;
  
--    /**
--     * Builds an instance of this class from the overridden transform.
--     */
++    /** Builds an instance of this class from the overridden transform. */
      @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
--    public StreamingUnboundedRead(DataflowRunner runner, Read.Unbounded<T> transform) {
++    public StreamingUnboundedRead(DataflowRunner runner,
++        Read.Unbounded<T> transform,
++        PCollection<T> originalOutput) {
        this.source = transform.getSource();
      }
  
@@@ -1111,7 -1111,7 +1115,9 @@@
  
      /** Builds an instance of this class from the overridden transform. */
      @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
--    public StreamingBoundedRead(DataflowRunner runner, Read.Bounded<T> transform) {
++    public StreamingBoundedRead(DataflowRunner runner,
++        Read.Bounded<T> transform,
++        PCollection<T> originalOutput) {
        this.source = transform.getSource();
      }