You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/18 23:05:21 UTC

[1/2] beam git commit: [BEAM-2544] Fix flaky AvroIOTest by eliminating race condition in "write then read" tests.

Repository: beam
Updated Branches:
  refs/heads/master dd9e866e0 -> be5b9347b


[BEAM-2544] Fix flaky AvroIOTest by eliminating race condition in "write then read" tests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/911edbad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/911edbad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/911edbad

Branch: refs/heads/master
Commit: 911edbade388a63626e0ad6f8b7c2ad7a9f9b7c2
Parents: dd9e866
Author: Alex Filatov <al...@users.noreply.github.com>
Authored: Thu Jun 29 23:23:04 2017 +0300
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jul 18 15:49:44 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 46 +++++++++++---------
 1 file changed, 25 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/911edbad/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 4a1386c..4380c57 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -90,7 +90,11 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class AvroIOTest {
 
-  @Rule public TestPipeline p = TestPipeline.create();
+  @Rule
+  public TestPipeline writePipeline = TestPipeline.create();
+
+  @Rule
+  public TestPipeline readPipeline = TestPipeline.create();
 
   @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
 
@@ -144,15 +148,15 @@ public class AvroIOTest {
         ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
 
-    p.apply(Create.of(values))
+    writePipeline.apply(Create.of(values))
         .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
-    p.run();
+    writePipeline.run().waitUntilFinish();
 
     PCollection<GenericClass> input =
-        p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
+        readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
 
     PAssert.that(input).containsInAnyOrder(values);
-    p.run();
+    readPipeline.run();
   }
 
   @Test
@@ -163,19 +167,19 @@ public class AvroIOTest {
         ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
 
-    p.apply(Create.of(values))
+    writePipeline.apply(Create.of(values))
         .apply(
             AvroIO.write(GenericClass.class)
                 .to(outputFile.getAbsolutePath())
                 .withoutSharding()
                 .withCodec(CodecFactory.deflateCodec(9)));
-    p.run();
+    writePipeline.run().waitUntilFinish();
 
     PCollection<GenericClass> input =
-        p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
+        readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
 
     PAssert.that(input).containsInAnyOrder(values);
-    p.run();
+    readPipeline.run();
     DataFileStream dataFileStream =
         new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader());
     assertEquals("deflate", dataFileStream.getMetaString("avro.codec"));
@@ -189,19 +193,19 @@ public class AvroIOTest {
         ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
 
-    p.apply(Create.of(values))
+    writePipeline.apply(Create.of(values))
         .apply(
             AvroIO.write(GenericClass.class)
                 .to(outputFile.getAbsolutePath())
                 .withoutSharding()
                 .withCodec(CodecFactory.nullCodec()));
-    p.run();
+    writePipeline.run().waitUntilFinish();
 
     PCollection<GenericClass> input =
-        p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
+        readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
 
     PAssert.that(input).containsInAnyOrder(values);
-    p.run();
+    readPipeline.run();
     DataFileStream dataFileStream =
         new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader());
     assertEquals("null", dataFileStream.getMetaString("avro.codec"));
@@ -261,18 +265,18 @@ public class AvroIOTest {
         ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
 
-    p.apply(Create.of(values))
+    writePipeline.apply(Create.of(values))
         .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
-    p.run();
+    writePipeline.run().waitUntilFinish();
 
     List<GenericClassV2> expected =
         ImmutableList.of(new GenericClassV2(3, "hi", null), new GenericClassV2(5, "bar", null));
 
     PCollection<GenericClassV2> input =
-        p.apply(AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath()));
+        readPipeline.apply(AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath()));
 
     PAssert.that(input).containsInAnyOrder(expected);
-    p.run();
+    readPipeline.run();
   }
 
   private static class WindowedFilenamePolicy extends FilenamePolicy {
@@ -467,7 +471,7 @@ public class AvroIOTest {
         ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
 
-    p.apply(Create.of(values))
+    writePipeline.apply(Create.of(values))
         .apply(
             AvroIO.write(GenericClass.class)
                 .to(outputFile.getAbsolutePath())
@@ -480,7 +484,7 @@ public class AvroIOTest {
                         100L,
                         "bytesKey",
                         "bytesValue".getBytes())));
-    p.run();
+    writePipeline.run();
 
     DataFileStream dataFileStream =
         new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader());
@@ -502,8 +506,8 @@ public class AvroIOTest {
       System.out.println("no sharding");
       write = write.withoutSharding();
     }
-    p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);
-    p.run();
+    writePipeline.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);
+    writePipeline.run();
 
     String shardNameTemplate =
         firstNonNull(


[2/2] beam git commit: This closes #3475: [BEAM-2544] Fix flaky AvroIOTest

Posted by jk...@apache.org.
This closes #3475: [BEAM-2544] Fix flaky AvroIOTest


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/be5b9347
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/be5b9347
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/be5b9347

Branch: refs/heads/master
Commit: be5b9347bc44de1f042c76e1ba3f47a13772c72b
Parents: dd9e866 911edba
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 18 15:49:54 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jul 18 15:49:54 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 46 +++++++++++---------
 1 file changed, 25 insertions(+), 21 deletions(-)
----------------------------------------------------------------------