You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 19:53:26 UTC
[30/50] [abbrv] beam git commit: [BEAM-2544] Fix flaky AvroIOTest by
eliminating race condition in "write then read" tests.
[BEAM-2544] Fix flaky AvroIOTest by eliminating race condition in "write then read" tests.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/911edbad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/911edbad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/911edbad
Branch: refs/heads/DSL_SQL
Commit: 911edbade388a63626e0ad6f8b7c2ad7a9f9b7c2
Parents: dd9e866
Author: Alex Filatov <al...@users.noreply.github.com>
Authored: Thu Jun 29 23:23:04 2017 +0300
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jul 18 15:49:44 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 46 +++++++++++---------
1 file changed, 25 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/911edbad/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 4a1386c..4380c57 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -90,7 +90,11 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class AvroIOTest {
- @Rule public TestPipeline p = TestPipeline.create();
+ @Rule
+ public TestPipeline writePipeline = TestPipeline.create();
+
+ @Rule
+ public TestPipeline readPipeline = TestPipeline.create();
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -144,15 +148,15 @@ public class AvroIOTest {
ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
- p.apply(Create.of(values))
+ writePipeline.apply(Create.of(values))
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
- p.run();
+ writePipeline.run().waitUntilFinish();
PCollection<GenericClass> input =
- p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
+ readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(values);
- p.run();
+ readPipeline.run();
}
@Test
@@ -163,19 +167,19 @@ public class AvroIOTest {
ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
- p.apply(Create.of(values))
+ writePipeline.apply(Create.of(values))
.apply(
AvroIO.write(GenericClass.class)
.to(outputFile.getAbsolutePath())
.withoutSharding()
.withCodec(CodecFactory.deflateCodec(9)));
- p.run();
+ writePipeline.run().waitUntilFinish();
PCollection<GenericClass> input =
- p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
+ readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(values);
- p.run();
+ readPipeline.run();
DataFileStream dataFileStream =
new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader());
assertEquals("deflate", dataFileStream.getMetaString("avro.codec"));
@@ -189,19 +193,19 @@ public class AvroIOTest {
ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
- p.apply(Create.of(values))
+ writePipeline.apply(Create.of(values))
.apply(
AvroIO.write(GenericClass.class)
.to(outputFile.getAbsolutePath())
.withoutSharding()
.withCodec(CodecFactory.nullCodec()));
- p.run();
+ writePipeline.run().waitUntilFinish();
PCollection<GenericClass> input =
- p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
+ readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(values);
- p.run();
+ readPipeline.run();
DataFileStream dataFileStream =
new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader());
assertEquals("null", dataFileStream.getMetaString("avro.codec"));
@@ -261,18 +265,18 @@ public class AvroIOTest {
ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
- p.apply(Create.of(values))
+ writePipeline.apply(Create.of(values))
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
- p.run();
+ writePipeline.run().waitUntilFinish();
List<GenericClassV2> expected =
ImmutableList.of(new GenericClassV2(3, "hi", null), new GenericClassV2(5, "bar", null));
PCollection<GenericClassV2> input =
- p.apply(AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath()));
+ readPipeline.apply(AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(expected);
- p.run();
+ readPipeline.run();
}
private static class WindowedFilenamePolicy extends FilenamePolicy {
@@ -467,7 +471,7 @@ public class AvroIOTest {
ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
- p.apply(Create.of(values))
+ writePipeline.apply(Create.of(values))
.apply(
AvroIO.write(GenericClass.class)
.to(outputFile.getAbsolutePath())
@@ -480,7 +484,7 @@ public class AvroIOTest {
100L,
"bytesKey",
"bytesValue".getBytes())));
- p.run();
+ writePipeline.run();
DataFileStream dataFileStream =
new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader());
@@ -502,8 +506,8 @@ public class AvroIOTest {
System.out.println("no sharding");
write = write.withoutSharding();
}
- p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);
- p.run();
+ writePipeline.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);
+ writePipeline.run();
String shardNameTemplate =
firstNonNull(