You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/11 19:37:17 UTC
[1/2] beam git commit: [BEAM-2260] Improve construction-time errors
for Text and AvroIO
Repository: beam
Updated Branches:
refs/heads/master 25f6358d6 -> 6d9e91b29
[BEAM-2260] Improve construction-time errors for Text and AvroIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/711f79b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/711f79b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/711f79b3
Branch: refs/heads/master
Commit: 711f79b338b260185475df6d54af898f7dc51b9d
Parents: 25f6358
Author: Dan Halperin <dh...@google.com>
Authored: Thu May 11 10:45:20 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 11 12:37:07 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/io/AvroIO.java | 2 ++
.../main/java/org/apache/beam/sdk/io/TextIO.java | 2 ++
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 16 ++++++++++++++++
.../java/org/apache/beam/sdk/io/TextIOTest.java | 12 +++++++++++-
4 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/711f79b3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index c7e7233..d13c6ff 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -441,6 +441,8 @@ public class AvroIO {
"Cannot set a filename policy and also a filename template or suffix.");
checkState(getSchema() != null,
"Need to set the schema of an AvroIO.Write transform.");
+ checkState(!getWindowedWrites() || (getFilenamePolicy() != null),
+ "When using windowed writes, a filename policy must be set via withFilenamePolicy().");
FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
if (usedFilenamePolicy == null) {
http://git-wip-us.apache.org/repos/asf/beam/blob/711f79b3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index dbfaeee..af6a069 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -430,6 +430,8 @@ public class TextIO {
(getFilenamePolicy() == null)
|| (getShardTemplate() == null && getFilenameSuffix() == null),
"Cannot set a filename policy and also a filename template or suffix.");
+ checkState(!getWindowedWrites() || (getFilenamePolicy() != null),
+ "When using windowed writes, a filename policy must be set via withFilenamePolicy().");
FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
if (usedFilenamePolicy == null) {
http://git-wip-us.apache.org/repos/asf/beam/blob/711f79b3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 47b847f..d71f2f7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -54,6 +54,7 @@ import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -77,6 +78,7 @@ import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -93,6 +95,9 @@ public class AvroIOTest {
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
@Test
public void testAvroIOGetName() {
assertEquals("AvroIO.Read", AvroIO.read(String.class).from("/tmp/foo*/baz").getName());
@@ -560,4 +565,15 @@ public class AvroIOTest {
assertThat(displayData, hasDisplayItem("numShards", 100));
assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString()));
}
+
+ @Test
+ public void testWindowedWriteRequiresFilenamePolicy() {
+ PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of()));
+ AvroIO.Write write = AvroIO.write(String.class).to("/tmp/some/file").withWindowedWrites();
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage(
+ "When using windowed writes, a filename policy must be set via withFilenamePolicy()");
+ emptyInput.apply(write);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/711f79b3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 685da82..0d8fbbd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -107,7 +107,6 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
@SuppressWarnings("unchecked")
public class TextIOTest {
-
private static final String MY_HEADER = "myHeader";
private static final String MY_FOOTER = "myFooter";
private static final String[] EMPTY = new String[] {};
@@ -1103,5 +1102,16 @@ public class TextIOTest {
assertThat(splits, hasSize(equalTo(1)));
SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
}
+
+ @Test
+ public void testWindowedWriteRequiresFilenamePolicy() {
+ PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of()));
+ TextIO.Write write = TextIO.write().to("/tmp/some/file").withWindowedWrites();
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage(
+ "When using windowed writes, a filename policy must be set via withFilenamePolicy()");
+ emptyInput.apply(write);
+ }
}
[2/2] beam git commit: This closes #3084
Posted by dh...@apache.org.
This closes #3084
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6d9e91b2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6d9e91b2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6d9e91b2
Branch: refs/heads/master
Commit: 6d9e91b29665a74f6d2b9a0ebd17e902714ca6e6
Parents: 25f6358 711f79b
Author: Dan Halperin <dh...@google.com>
Authored: Thu May 11 12:37:09 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 11 12:37:09 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/io/AvroIO.java | 2 ++
.../main/java/org/apache/beam/sdk/io/TextIO.java | 2 ++
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 16 ++++++++++++++++
.../java/org/apache/beam/sdk/io/TextIOTest.java | 12 +++++++++++-
4 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------