You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/08/11 17:47:24 UTC

[2/3] incubator-beam git commit: Remove Streaming Write Overrides in DataflowRunner

Remove Streaming Write Overrides in DataflowRunner

These writes should be forbidden based on the boundedness of the input
PCollection. As Write explicitly forbids the application of the
transform to an Unbounded PCollection, this will be equivalent in most
cases; In cases where the input PCollection is Bounded, due to an
UnboundedReadFromBoundedSource, the write will function as expected and
does not need to be forbidden.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aa380d87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aa380d87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aa380d87

Branch: refs/heads/master
Commit: aa380d87d4cc429277482ee67118c0515633f8cb
Parents: 0e35a9b
Author: Thomas Groh <tg...@google.com>
Authored: Tue Aug 9 10:47:09 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 11 10:31:40 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 72 +-------------------
 .../runners/dataflow/DataflowRunnerTest.java    | 24 -------
 .../java/org/apache/beam/sdk/io/WriteTest.java  | 12 +++-
 3 files changed, 11 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aa380d87/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index bea6264..76dbecf 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow;
 import static org.apache.beam.sdk.util.StringUtils.approximatePTransformName;
 import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
 import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows;
-
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
@@ -58,14 +57,12 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.io.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.PubsubUnboundedSource;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -143,6 +140,7 @@ import com.google.common.collect.Multimap;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.joda.time.DateTimeUtils;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
@@ -172,6 +170,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+
 import javax.annotation.Nullable;
 
 /**
@@ -329,11 +328,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
       builder.put(View.AsList.class, StreamingViewAsList.class);
       builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
-      builder.put(Write.Bound.class, StreamingWrite.class);
       builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
       builder.put(Read.Bounded.class, StreamingBoundedRead.class);
-      builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class);
-      builder.put(TextIO.Write.Bound.class, UnsupportedIO.class);
       builder.put(Window.Bound.class, AssignWindows.class);
       // In streaming mode must use either the custom Pubsub unbounded source/sink or
       // defer to Windmill's built-in implementation.
@@ -2045,30 +2041,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
-  /**
-   * Specialized (non-)implementation for
-   * {@link org.apache.beam.sdk.io.Write.Bound Write.Bound}
-   * for the Dataflow runner in streaming mode.
-   */
-  private static class StreamingWrite<T> extends PTransform<PCollection<T>, PDone> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingWrite(DataflowRunner runner, Write.Bound<T> transform) { }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      throw new UnsupportedOperationException(
-          "The Write transform is not supported by the Dataflow streaming runner.");
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingWrite";
-    }
-  }
-
   // ================================================================================
   // PubsubIO translations
   // ================================================================================
@@ -2727,51 +2699,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
      * Builds an instance of this class from the overridden transform.
      */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public UnsupportedIO(DataflowRunner runner, AvroIO.Read.Bound<?> transform) {
-      this.transform = transform;
-    }
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public UnsupportedIO(DataflowRunner runner, TextIO.Read.Bound<?> transform) {
-      this.transform = transform;
-    }
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public UnsupportedIO(DataflowRunner runner, Read.Bounded<?> transform) {
-      this.transform = transform;
-    }
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
     public UnsupportedIO(DataflowRunner runner, Read.Unbounded<?> transform) {
       this.transform = transform;
     }
 
     /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public UnsupportedIO(DataflowRunner runner, AvroIO.Write.Bound<?> transform) {
-      this.transform = transform;
-    }
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public UnsupportedIO(DataflowRunner runner, TextIO.Write.Bound<?> transform) {
-      this.transform = transform;
-    }
-
-    /**
      * Builds an instance of this class from the overridden doFn.
      */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aa380d87/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 704410d..d7deffd 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -82,7 +81,6 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -956,28 +954,6 @@ public class DataflowRunnerTest {
     testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);
   }
 
-  private void testUnsupportedSink(
-      PTransform<PCollection<String>, PDone> sink, String name, boolean streaming)
-          throws Exception {
-    thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage(
-        "The DataflowRunner in streaming mode does not support " + name);
-
-    Pipeline p = Pipeline.create(makeOptions(streaming));
-    p.apply(Create.of("foo")).apply(sink);
-    p.run();
-  }
-
-  @Test
-  public void testAvroIOSinkUnsupportedInStreaming() throws Exception {
-    testUnsupportedSink(AvroIO.Write.to("foo").withSchema(String.class), "AvroIO.Write", true);
-  }
-
-  @Test
-  public void testTextIOSinkUnsupportedInStreaming() throws Exception {
-    testUnsupportedSink(TextIO.Write.to("foo"), "TextIO.Write", true);
-  }
-
   @Test
   public void testBatchViewAsSingletonToIsmRecord() throws Exception {
     DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aa380d87/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 6b44b6a..f9bf472 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -52,7 +52,6 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
@@ -294,8 +293,8 @@ public class WriteTest {
   @Test
   public void testWriteUnbounded() {
     TestPipeline p = TestPipeline.create();
-    PCollection<String> unbounded =
-        p.apply(Create.of("foo")).setIsBoundedInternal(IsBounded.UNBOUNDED);
+    PCollection<String> unbounded = p.apply(CountingInput.unbounded())
+        .apply(MapElements.via(new ToStringFn()));
 
     TestSink sink = new TestSink();
     thrown.expect(IllegalArgumentException.class);
@@ -303,6 +302,13 @@ public class WriteTest {
     unbounded.apply(Write.to(sink));
   }
 
+  private static class ToStringFn extends SimpleFunction<Long, String> {
+    @Override
+    public String apply(Long input) {
+      return Long.toString(input);
+    }
+  }
+
   /**
    * Performs a Write transform and verifies the Write transform calls the appropriate methods on
    * a test sink in the correct order, as well as verifies that the elements of a PCollection are