You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/08/11 17:47:23 UTC

[1/3] incubator-beam git commit: Improve Write Error Message

Repository: incubator-beam
Updated Branches:
  refs/heads/master 1d9ad85cc -> 3a858ee9e


Improve Write Error Message

If provided with an Unbounded PCollection, Write will fail due to
restriction of calling finalize only once. This error message fails in a
deep stack trace based on it not being possible to apply a GroupByKey.
Instead, throw immediately on application with a specific error message.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0e35a9b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0e35a9b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0e35a9b5

Branch: refs/heads/master
Commit: 0e35a9b5e2e7e3c064ffe0beae7176923d1b9679
Parents: 1d9ad85
Author: Thomas Groh <tg...@google.com>
Authored: Mon Aug 8 19:09:58 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 11 10:31:39 2016 -0700

----------------------------------------------------------------------
 .../direct/WriteWithShardingFactory.java        |  4 ++++
 .../main/java/org/apache/beam/sdk/io/Write.java |  5 ++++
 .../java/org/apache/beam/sdk/io/WriteTest.java  | 24 ++++++++++++++++----
 3 files changed, 28 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e35a9b5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index cee4001..c2157b8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
@@ -74,6 +75,9 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
 
     @Override
     public PDone apply(PCollection<T> input) {
+      checkArgument(IsBounded.BOUNDED == input.isBounded(),
+          "%s can only be applied to a Bounded PCollection",
+          getClass().getSimpleName());
       PCollection<T> records = input.apply("RewindowInputs",
           Window.<T>into(new GlobalWindows()).triggering(DefaultTrigger.of())
               .withAllowedLateness(Duration.ZERO)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e35a9b5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index a846b7c..fea65ef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.sdk.Pipeline;
@@ -40,6 +41,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 
@@ -106,6 +108,9 @@ public class Write {
 
     @Override
     public PDone apply(PCollection<T> input) {
+      checkArgument(IsBounded.BOUNDED == input.isBounded(),
+          "%s can only be applied to a Bounded PCollection",
+          Write.class.getSimpleName());
       PipelineOptions options = input.getPipeline().getOptions();
       sink.validate(options);
       return createWrite(input, sink.createWriteOperation(options));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e35a9b5/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 705b77c..6b44b6a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -19,8 +19,6 @@ package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
-import static org.apache.beam.sdk.values.KV.of;
-
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
@@ -30,8 +28,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
-import static java.util.concurrent.ThreadLocalRandom.current;
-
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
@@ -56,14 +52,17 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -76,6 +75,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -83,6 +83,8 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 @RunWith(JUnit4.class)
 public class WriteTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
   // Static store that can be accessed within the writer
   private static List<String> sinkContents = new ArrayList<>();
   // Static count of output shards
@@ -108,7 +110,7 @@ public class WriteTest {
 
       @ProcessElement
       public void processElement(ProcessContext c) {
-        c.output(of(current().nextInt(), c.element()));
+        c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
       }
     }
 
@@ -289,6 +291,18 @@ public class WriteTest {
     assertThat(displayData, hasDisplayItem("numShards", 1));
   }
 
+  @Test
+  public void testWriteUnbounded() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<String> unbounded =
+        p.apply(Create.of("foo")).setIsBoundedInternal(IsBounded.UNBOUNDED);
+
+    TestSink sink = new TestSink();
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Write can only be applied to a Bounded PCollection");
+    unbounded.apply(Write.to(sink));
+  }
+
   /**
    * Performs a Write transform and verifies the Write transform calls the appropriate methods on
    * a test sink in the correct order, as well as verifies that the elements of a PCollection are


[2/3] incubator-beam git commit: Remove Streaming Write Overrides in DataflowRunner

Posted by lc...@apache.org.
Remove Streaming Write Overrides in DataflowRunner

These writes should be forbidden based on the boundedness of the input
PCollection. As Write explicitly forbids the application of the
transform to an Unbounded PCollection, this will be equivalent in most
cases; In cases where the input PCollection is Bounded, due to an
UnboundedReadFromBoundedSource, the write will function as expected and
does not need to be forbidden.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aa380d87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aa380d87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aa380d87

Branch: refs/heads/master
Commit: aa380d87d4cc429277482ee67118c0515633f8cb
Parents: 0e35a9b
Author: Thomas Groh <tg...@google.com>
Authored: Tue Aug 9 10:47:09 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 11 10:31:40 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 72 +-------------------
 .../runners/dataflow/DataflowRunnerTest.java    | 24 -------
 .../java/org/apache/beam/sdk/io/WriteTest.java  | 12 +++-
 3 files changed, 11 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aa380d87/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index bea6264..76dbecf 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow;
 import static org.apache.beam.sdk.util.StringUtils.approximatePTransformName;
 import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
 import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows;
-
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
@@ -58,14 +57,12 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.io.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.PubsubUnboundedSource;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -143,6 +140,7 @@ import com.google.common.collect.Multimap;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.joda.time.DateTimeUtils;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
@@ -172,6 +170,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+
 import javax.annotation.Nullable;
 
 /**
@@ -329,11 +328,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
       builder.put(View.AsList.class, StreamingViewAsList.class);
       builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
-      builder.put(Write.Bound.class, StreamingWrite.class);
       builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
       builder.put(Read.Bounded.class, StreamingBoundedRead.class);
-      builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class);
-      builder.put(TextIO.Write.Bound.class, UnsupportedIO.class);
       builder.put(Window.Bound.class, AssignWindows.class);
       // In streaming mode must use either the custom Pubsub unbounded source/sink or
       // defer to Windmill's built-in implementation.
@@ -2045,30 +2041,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
-  /**
-   * Specialized (non-)implementation for
-   * {@link org.apache.beam.sdk.io.Write.Bound Write.Bound}
-   * for the Dataflow runner in streaming mode.
-   */
-  private static class StreamingWrite<T> extends PTransform<PCollection<T>, PDone> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingWrite(DataflowRunner runner, Write.Bound<T> transform) { }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      throw new UnsupportedOperationException(
-          "The Write transform is not supported by the Dataflow streaming runner.");
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingWrite";
-    }
-  }
-
   // ================================================================================
   // PubsubIO translations
   // ================================================================================
@@ -2727,51 +2699,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
      * Builds an instance of this class from the overridden transform.
      */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public UnsupportedIO(DataflowRunner runner, AvroIO.Read.Bound<?> transform) {
-      this.transform = transform;
-    }
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public UnsupportedIO(DataflowRunner runner, TextIO.Read.Bound<?> transform) {
-      this.transform = transform;
-    }
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public UnsupportedIO(DataflowRunner runner, Read.Bounded<?> transform) {
-      this.transform = transform;
-    }
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
     public UnsupportedIO(DataflowRunner runner, Read.Unbounded<?> transform) {
       this.transform = transform;
     }
 
     /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public UnsupportedIO(DataflowRunner runner, AvroIO.Write.Bound<?> transform) {
-      this.transform = transform;
-    }
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public UnsupportedIO(DataflowRunner runner, TextIO.Write.Bound<?> transform) {
-      this.transform = transform;
-    }
-
-    /**
      * Builds an instance of this class from the overridden doFn.
      */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aa380d87/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 704410d..d7deffd 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -82,7 +81,6 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -956,28 +954,6 @@ public class DataflowRunnerTest {
     testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);
   }
 
-  private void testUnsupportedSink(
-      PTransform<PCollection<String>, PDone> sink, String name, boolean streaming)
-          throws Exception {
-    thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage(
-        "The DataflowRunner in streaming mode does not support " + name);
-
-    Pipeline p = Pipeline.create(makeOptions(streaming));
-    p.apply(Create.of("foo")).apply(sink);
-    p.run();
-  }
-
-  @Test
-  public void testAvroIOSinkUnsupportedInStreaming() throws Exception {
-    testUnsupportedSink(AvroIO.Write.to("foo").withSchema(String.class), "AvroIO.Write", true);
-  }
-
-  @Test
-  public void testTextIOSinkUnsupportedInStreaming() throws Exception {
-    testUnsupportedSink(TextIO.Write.to("foo"), "TextIO.Write", true);
-  }
-
   @Test
   public void testBatchViewAsSingletonToIsmRecord() throws Exception {
     DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aa380d87/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 6b44b6a..f9bf472 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -52,7 +52,6 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
@@ -294,8 +293,8 @@ public class WriteTest {
   @Test
   public void testWriteUnbounded() {
     TestPipeline p = TestPipeline.create();
-    PCollection<String> unbounded =
-        p.apply(Create.of("foo")).setIsBoundedInternal(IsBounded.UNBOUNDED);
+    PCollection<String> unbounded = p.apply(CountingInput.unbounded())
+        .apply(MapElements.via(new ToStringFn()));
 
     TestSink sink = new TestSink();
     thrown.expect(IllegalArgumentException.class);
@@ -303,6 +302,13 @@ public class WriteTest {
     unbounded.apply(Write.to(sink));
   }
 
+  private static class ToStringFn extends SimpleFunction<Long, String> {
+    @Override
+    public String apply(Long input) {
+      return Long.toString(input);
+    }
+  }
+
   /**
    * Performs a Write transform and verifies the Write transform calls the appropriate methods on
    * a test sink in the correct order, as well as verifies that the elements of a PCollection are


[3/3] incubator-beam git commit: Improve Write Failure Message

Posted by lc...@apache.org.
Improve Write Failure Message

This closes #802


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3a858ee9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3a858ee9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3a858ee9

Branch: refs/heads/master
Commit: 3a858ee9eb9f2ebd8a715d048c0abd90b1328a1f
Parents: 1d9ad85 aa380d8
Author: Luke Cwik <lc...@google.com>
Authored: Thu Aug 11 10:32:09 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 11 10:32:09 2016 -0700

----------------------------------------------------------------------
 .../direct/WriteWithShardingFactory.java        |  4 ++
 .../beam/runners/dataflow/DataflowRunner.java   | 72 +-------------------
 .../runners/dataflow/DataflowRunnerTest.java    | 24 -------
 .../main/java/org/apache/beam/sdk/io/Write.java |  5 ++
 .../java/org/apache/beam/sdk/io/WriteTest.java  | 30 ++++++--
 5 files changed, 36 insertions(+), 99 deletions(-)
----------------------------------------------------------------------