You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/08/11 17:47:23 UTC
[1/3] incubator-beam git commit: Improve Write Error Message
Repository: incubator-beam
Updated Branches:
refs/heads/master 1d9ad85cc -> 3a858ee9e
Improve Write Error Message
If provided with an Unbounded PCollection, Write will fail due to
restriction of calling finalize only once. This error message fails in a
deep stack trace based on it not being possible to apply a GroupByKey.
Instead, throw immediately on application with a specific error message.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0e35a9b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0e35a9b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0e35a9b5
Branch: refs/heads/master
Commit: 0e35a9b5e2e7e3c064ffe0beae7176923d1b9679
Parents: 1d9ad85
Author: Thomas Groh <tg...@google.com>
Authored: Mon Aug 8 19:09:58 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 11 10:31:39 2016 -0700
----------------------------------------------------------------------
.../direct/WriteWithShardingFactory.java | 4 ++++
.../main/java/org/apache/beam/sdk/io/Write.java | 5 ++++
.../java/org/apache/beam/sdk/io/WriteTest.java | 24 ++++++++++++++++----
3 files changed, 28 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e35a9b5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index cee4001..c2157b8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
@@ -74,6 +75,9 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
@Override
public PDone apply(PCollection<T> input) {
+ checkArgument(IsBounded.BOUNDED == input.isBounded(),
+ "%s can only be applied to a Bounded PCollection",
+ getClass().getSimpleName());
PCollection<T> records = input.apply("RewindowInputs",
Window.<T>into(new GlobalWindows()).triggering(DefaultTrigger.of())
.withAllowedLateness(Duration.ZERO)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e35a9b5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index a846b7c..fea65ef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.beam.sdk.Pipeline;
@@ -40,6 +41,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
@@ -106,6 +108,9 @@ public class Write {
@Override
public PDone apply(PCollection<T> input) {
+ checkArgument(IsBounded.BOUNDED == input.isBounded(),
+ "%s can only be applied to a Bounded PCollection",
+ Write.class.getSimpleName());
PipelineOptions options = input.getPipeline().getOptions();
sink.validate(options);
return createWrite(input, sink.createWriteOperation(options));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e35a9b5/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 705b77c..6b44b6a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -19,8 +19,6 @@ package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
-import static org.apache.beam.sdk.values.KV.of;
-
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
@@ -30,8 +28,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import static java.util.concurrent.ThreadLocalRandom.current;
-
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
@@ -56,14 +52,17 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -76,6 +75,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -83,6 +83,8 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
@RunWith(JUnit4.class)
public class WriteTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
// Static store that can be accessed within the writer
private static List<String> sinkContents = new ArrayList<>();
// Static count of output shards
@@ -108,7 +110,7 @@ public class WriteTest {
@ProcessElement
public void processElement(ProcessContext c) {
- c.output(of(current().nextInt(), c.element()));
+ c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
}
}
@@ -289,6 +291,18 @@ public class WriteTest {
assertThat(displayData, hasDisplayItem("numShards", 1));
}
+ @Test
+ public void testWriteUnbounded() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<String> unbounded =
+ p.apply(Create.of("foo")).setIsBoundedInternal(IsBounded.UNBOUNDED);
+
+ TestSink sink = new TestSink();
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Write can only be applied to a Bounded PCollection");
+ unbounded.apply(Write.to(sink));
+ }
+
/**
* Performs a Write transform and verifies the Write transform calls the appropriate methods on
* a test sink in the correct order, as well as verifies that the elements of a PCollection are
[2/3] incubator-beam git commit: Remove Streaming Write Overrides in
DataflowRunner
Posted by lc...@apache.org.
Remove Streaming Write Overrides in DataflowRunner
These writes should be forbidden based on the boundedness of the input
PCollection. As Write explicitly forbids the application of the
transform to an Unbounded PCollection, this will be equivalent in most
cases; In cases where the input PCollection is Bounded, due to an
UnboundedReadFromBoundedSource, the write will function as expected and
does not need to be forbidden.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aa380d87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aa380d87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aa380d87
Branch: refs/heads/master
Commit: aa380d87d4cc429277482ee67118c0515633f8cb
Parents: 0e35a9b
Author: Thomas Groh <tg...@google.com>
Authored: Tue Aug 9 10:47:09 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 11 10:31:40 2016 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 72 +-------------------
.../runners/dataflow/DataflowRunnerTest.java | 24 -------
.../java/org/apache/beam/sdk/io/WriteTest.java | 12 +++-
3 files changed, 11 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aa380d87/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index bea6264..76dbecf 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow;
import static org.apache.beam.sdk.util.StringUtils.approximatePTransformName;
import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows;
-
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
@@ -58,14 +57,12 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.io.PubsubUnboundedSink;
import org.apache.beam.sdk.io.PubsubUnboundedSource;
import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -143,6 +140,7 @@ import com.google.common.collect.Multimap;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
@@ -172,6 +170,7 @@ import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+
import javax.annotation.Nullable;
/**
@@ -329,11 +328,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
builder.put(View.AsList.class, StreamingViewAsList.class);
builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
- builder.put(Write.Bound.class, StreamingWrite.class);
builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
builder.put(Read.Bounded.class, StreamingBoundedRead.class);
- builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class);
- builder.put(TextIO.Write.Bound.class, UnsupportedIO.class);
builder.put(Window.Bound.class, AssignWindows.class);
// In streaming mode must use either the custom Pubsub unbounded source/sink or
// defer to Windmill's built-in implementation.
@@ -2045,30 +2041,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
- /**
- * Specialized (non-)implementation for
- * {@link org.apache.beam.sdk.io.Write.Bound Write.Bound}
- * for the Dataflow runner in streaming mode.
- */
- private static class StreamingWrite<T> extends PTransform<PCollection<T>, PDone> {
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingWrite(DataflowRunner runner, Write.Bound<T> transform) { }
-
- @Override
- public PDone apply(PCollection<T> input) {
- throw new UnsupportedOperationException(
- "The Write transform is not supported by the Dataflow streaming runner.");
- }
-
- @Override
- protected String getKindString() {
- return "StreamingWrite";
- }
- }
-
// ================================================================================
// PubsubIO translations
// ================================================================================
@@ -2727,51 +2699,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public UnsupportedIO(DataflowRunner runner, AvroIO.Read.Bound<?> transform) {
- this.transform = transform;
- }
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public UnsupportedIO(DataflowRunner runner, TextIO.Read.Bound<?> transform) {
- this.transform = transform;
- }
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public UnsupportedIO(DataflowRunner runner, Read.Bounded<?> transform) {
- this.transform = transform;
- }
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
public UnsupportedIO(DataflowRunner runner, Read.Unbounded<?> transform) {
this.transform = transform;
}
/**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public UnsupportedIO(DataflowRunner runner, AvroIO.Write.Bound<?> transform) {
- this.transform = transform;
- }
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public UnsupportedIO(DataflowRunner runner, TextIO.Write.Bound<?> transform) {
- this.transform = transform;
- }
-
- /**
* Builds an instance of this class from the overridden doFn.
*/
@SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aa380d87/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 704410d..d7deffd 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -82,7 +81,6 @@ import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -956,28 +954,6 @@ public class DataflowRunnerTest {
testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);
}
- private void testUnsupportedSink(
- PTransform<PCollection<String>, PDone> sink, String name, boolean streaming)
- throws Exception {
- thrown.expect(UnsupportedOperationException.class);
- thrown.expectMessage(
- "The DataflowRunner in streaming mode does not support " + name);
-
- Pipeline p = Pipeline.create(makeOptions(streaming));
- p.apply(Create.of("foo")).apply(sink);
- p.run();
- }
-
- @Test
- public void testAvroIOSinkUnsupportedInStreaming() throws Exception {
- testUnsupportedSink(AvroIO.Write.to("foo").withSchema(String.class), "AvroIO.Write", true);
- }
-
- @Test
- public void testTextIOSinkUnsupportedInStreaming() throws Exception {
- testUnsupportedSink(TextIO.Write.to("foo"), "TextIO.Write", true);
- }
-
@Test
public void testBatchViewAsSingletonToIsmRecord() throws Exception {
DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aa380d87/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 6b44b6a..f9bf472 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -52,7 +52,6 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
@@ -294,8 +293,8 @@ public class WriteTest {
@Test
public void testWriteUnbounded() {
TestPipeline p = TestPipeline.create();
- PCollection<String> unbounded =
- p.apply(Create.of("foo")).setIsBoundedInternal(IsBounded.UNBOUNDED);
+ PCollection<String> unbounded = p.apply(CountingInput.unbounded())
+ .apply(MapElements.via(new ToStringFn()));
TestSink sink = new TestSink();
thrown.expect(IllegalArgumentException.class);
@@ -303,6 +302,13 @@ public class WriteTest {
unbounded.apply(Write.to(sink));
}
+ private static class ToStringFn extends SimpleFunction<Long, String> {
+ @Override
+ public String apply(Long input) {
+ return Long.toString(input);
+ }
+ }
+
/**
* Performs a Write transform and verifies the Write transform calls the appropriate methods on
* a test sink in the correct order, as well as verifies that the elements of a PCollection are
[3/3] incubator-beam git commit: Improve Write Failure Message
Posted by lc...@apache.org.
Improve Write Failure Message
This closes #802
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3a858ee9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3a858ee9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3a858ee9
Branch: refs/heads/master
Commit: 3a858ee9eb9f2ebd8a715d048c0abd90b1328a1f
Parents: 1d9ad85 aa380d8
Author: Luke Cwik <lc...@google.com>
Authored: Thu Aug 11 10:32:09 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 11 10:32:09 2016 -0700
----------------------------------------------------------------------
.../direct/WriteWithShardingFactory.java | 4 ++
.../beam/runners/dataflow/DataflowRunner.java | 72 +-------------------
.../runners/dataflow/DataflowRunnerTest.java | 24 -------
.../main/java/org/apache/beam/sdk/io/Write.java | 5 ++
.../java/org/apache/beam/sdk/io/WriteTest.java | 30 ++++++--
5 files changed, 36 insertions(+), 99 deletions(-)
----------------------------------------------------------------------