You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/11 19:37:17 UTC

[1/2] beam git commit: [BEAM-2260] Improve construction-time errors for Text and AvroIO

Repository: beam
Updated Branches:
  refs/heads/master 25f6358d6 -> 6d9e91b29


[BEAM-2260] Improve construction-time errors for Text and AvroIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/711f79b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/711f79b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/711f79b3

Branch: refs/heads/master
Commit: 711f79b338b260185475df6d54af898f7dc51b9d
Parents: 25f6358
Author: Dan Halperin <dh...@google.com>
Authored: Thu May 11 10:45:20 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 11 12:37:07 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/io/AvroIO.java    |  2 ++
 .../main/java/org/apache/beam/sdk/io/TextIO.java    |  2 ++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java     | 16 ++++++++++++++++
 .../java/org/apache/beam/sdk/io/TextIOTest.java     | 12 +++++++++++-
 4 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/711f79b3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index c7e7233..d13c6ff 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -441,6 +441,8 @@ public class AvroIO {
           "Cannot set a filename policy and also a filename template or suffix.");
       checkState(getSchema() != null,
           "Need to set the schema of an AvroIO.Write transform.");
+      checkState(!getWindowedWrites() || (getFilenamePolicy() != null),
+          "When using windowed writes, a filename policy must be set via withFilenamePolicy().");
 
       FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
       if (usedFilenamePolicy == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/711f79b3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index dbfaeee..af6a069 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -430,6 +430,8 @@ public class TextIO {
           (getFilenamePolicy() == null)
               || (getShardTemplate() == null && getFilenameSuffix() == null),
           "Cannot set a filename policy and also a filename template or suffix.");
+      checkState(!getWindowedWrites() || (getFilenamePolicy() != null),
+          "When using windowed writes, a filename policy must be set via withFilenamePolicy().");
 
       FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
       if (usedFilenamePolicy == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/711f79b3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 47b847f..d71f2f7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -54,6 +54,7 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -77,6 +78,7 @@ import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -93,6 +95,9 @@ public class AvroIOTest {
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Test
   public void testAvroIOGetName() {
     assertEquals("AvroIO.Read", AvroIO.read(String.class).from("/tmp/foo*/baz").getName());
@@ -560,4 +565,15 @@ public class AvroIOTest {
     assertThat(displayData, hasDisplayItem("numShards", 100));
     assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString()));
   }
+
+  @Test
+  public void testWindowedWriteRequiresFilenamePolicy() {
+    PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of()));
+    AvroIO.Write write = AvroIO.write(String.class).to("/tmp/some/file").withWindowedWrites();
+
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage(
+        "When using windowed writes, a filename policy must be set via withFilenamePolicy()");
+    emptyInput.apply(write);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/711f79b3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 685da82..0d8fbbd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -107,7 +107,6 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 @SuppressWarnings("unchecked")
 public class TextIOTest {
-
   private static final String MY_HEADER = "myHeader";
   private static final String MY_FOOTER = "myFooter";
   private static final String[] EMPTY = new String[] {};
@@ -1103,5 +1102,16 @@ public class TextIOTest {
     assertThat(splits, hasSize(equalTo(1)));
     SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
   }
+
+  @Test
+  public void testWindowedWriteRequiresFilenamePolicy() {
+    PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of()));
+    TextIO.Write write = TextIO.write().to("/tmp/some/file").withWindowedWrites();
+
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage(
+        "When using windowed writes, a filename policy must be set via withFilenamePolicy()");
+    emptyInput.apply(write);
+  }
 }
 


[2/2] beam git commit: This closes #3084

Posted by dh...@apache.org.
This closes #3084


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6d9e91b2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6d9e91b2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6d9e91b2

Branch: refs/heads/master
Commit: 6d9e91b29665a74f6d2b9a0ebd17e902714ca6e6
Parents: 25f6358 711f79b
Author: Dan Halperin <dh...@google.com>
Authored: Thu May 11 12:37:09 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 11 12:37:09 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/io/AvroIO.java    |  2 ++
 .../main/java/org/apache/beam/sdk/io/TextIO.java    |  2 ++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java     | 16 ++++++++++++++++
 .../java/org/apache/beam/sdk/io/TextIOTest.java     | 12 +++++++++++-
 4 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------