You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/12/28 19:41:37 UTC
[4/8] beam git commit: [BEAM-1186] Broke AvroIOGeneratedClassTest
into 2 parametrised test classes that support TestPipeline as a JUnit rule.
[BEAM-1186] Broke AvroIOGeneratedClassTest into 2 parametrised test classes that support TestPipeline as a JUnit rule.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b538574b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b538574b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b538574b
Branch: refs/heads/master
Commit: b538574bc6ac03a629be5034799c1021ee6dd3b0
Parents: 178371a
Author: Stas Levin <st...@gmail.com>
Authored: Wed Dec 21 17:58:35 2016 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Dec 28 11:40:32 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/io/AvroIOGeneratedClassTest.java | 285 ----------------
.../apache/beam/sdk/io/AvroIOTransformTest.java | 335 +++++++++++++++++++
2 files changed, 335 insertions(+), 285 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b538574b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
deleted file mode 100644
index ede135f..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for AvroIO Read and Write transforms, using classes generated from {@code user.avsc}.
- */
-// TODO: Stop requiring local files
-@RunWith(JUnit4.class)
-public class AvroIOGeneratedClassTest {
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
- private File avroFile;
-
- @Before
- public void prepareAvroFileBeforeAnyTest() throws IOException {
- avroFile = tmpFolder.newFile("file.avro");
- }
-
- private final String schemaString =
- "{\"namespace\": \"example.avro\",\n"
- + " \"type\": \"record\",\n"
- + " \"name\": \"AvroGeneratedUser\",\n"
- + " \"fields\": [\n"
- + " {\"name\": \"name\", \"type\": \"string\"},\n"
- + " {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n"
- + " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n"
- + " ]\n"
- + "}";
- private final Schema.Parser parser = new Schema.Parser();
- private final Schema schema = parser.parse(schemaString);
-
- private AvroGeneratedUser[] generateAvroObjects() {
- AvroGeneratedUser user1 = new AvroGeneratedUser();
- user1.setName("Bob");
- user1.setFavoriteNumber(256);
-
- AvroGeneratedUser user2 = new AvroGeneratedUser();
- user2.setName("Alice");
- user2.setFavoriteNumber(128);
-
- AvroGeneratedUser user3 = new AvroGeneratedUser();
- user3.setName("Ted");
- user3.setFavoriteColor("white");
-
- return new AvroGeneratedUser[] { user1, user2, user3 };
- }
-
- private GenericRecord[] generateAvroGenericRecords() {
- GenericRecord user1 = new GenericData.Record(schema);
- user1.put("name", "Bob");
- user1.put("favorite_number", 256);
-
- GenericRecord user2 = new GenericData.Record(schema);
- user2.put("name", "Alice");
- user2.put("favorite_number", 128);
-
- GenericRecord user3 = new GenericData.Record(schema);
- user3.put("name", "Ted");
- user3.put("favorite_color", "white");
-
- return new GenericRecord[] { user1, user2, user3 };
- }
-
- private void generateAvroFile(AvroGeneratedUser[] elements) throws IOException {
- DatumWriter<AvroGeneratedUser> userDatumWriter =
- new SpecificDatumWriter<>(AvroGeneratedUser.class);
- try (DataFileWriter<AvroGeneratedUser> dataFileWriter = new DataFileWriter<>(userDatumWriter)) {
- dataFileWriter.create(elements[0].getSchema(), avroFile);
- for (AvroGeneratedUser user : elements) {
- dataFileWriter.append(user);
- }
- }
- }
-
- private List<AvroGeneratedUser> readAvroFile() throws IOException {
- DatumReader<AvroGeneratedUser> userDatumReader =
- new SpecificDatumReader<>(AvroGeneratedUser.class);
- List<AvroGeneratedUser> users = new ArrayList<>();
- try (DataFileReader<AvroGeneratedUser> dataFileReader =
- new DataFileReader<>(avroFile, userDatumReader)) {
- while (dataFileReader.hasNext()) {
- users.add(dataFileReader.next());
- }
- }
- return users;
- }
-
- <T> void runTestRead(
- String applyName, AvroIO.Read.Bound<T> read, String expectedName, T[] expectedOutput)
- throws Exception {
- generateAvroFile(generateAvroObjects());
-
- TestPipeline p = TestPipeline.create();
- PCollection<T> output = p.apply(applyName, read);
- PAssert.that(output).containsInAnyOrder(expectedOutput);
- p.run();
- assertEquals(expectedName, output.getName());
- }
-
- <T> void runTestRead(AvroIO.Read.Bound<T> read, String expectedName, T[] expectedOutput)
- throws Exception {
- generateAvroFile(generateAvroObjects());
-
- TestPipeline p = TestPipeline.create();
- PCollection<T> output = p.apply(read);
- PAssert.that(output).containsInAnyOrder(expectedOutput);
- p.run();
- assertEquals(expectedName, output.getName());
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testReadFromGeneratedClass() throws Exception {
- runTestRead(
- AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
- "AvroIO.Read/Read.out",
- generateAvroObjects());
- runTestRead(
- AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()),
- "AvroIO.Read/Read.out",
- generateAvroObjects());
- runTestRead("MyRead",
- AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
- "MyRead/Read.out",
- generateAvroObjects());
- runTestRead("MyRead",
- AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()),
- "MyRead/Read.out",
- generateAvroObjects());
- runTestRead("HerRead",
- AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
- "HerRead/Read.out",
- generateAvroObjects());
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testReadFromSchema() throws Exception {
- runTestRead(
- AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
- "AvroIO.Read/Read.out",
- generateAvroGenericRecords());
- runTestRead(
- AvroIO.Read.withSchema(schema).from(avroFile.getPath()),
- "AvroIO.Read/Read.out",
- generateAvroGenericRecords());
- runTestRead("MyRead",
- AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
- "MyRead/Read.out",
- generateAvroGenericRecords());
- runTestRead("MyRead",
- AvroIO.Read.withSchema(schema).from(avroFile.getPath()),
- "MyRead/Read.out",
- generateAvroGenericRecords());
- runTestRead("HerRead",
- AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
- "HerRead/Read.out",
- generateAvroGenericRecords());
- runTestRead("HerRead",
- AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
- "HerRead/Read.out",
- generateAvroGenericRecords());
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testReadFromSchemaString() throws Exception {
- runTestRead(
- AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString),
- "AvroIO.Read/Read.out",
- generateAvroGenericRecords());
- runTestRead(
- AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()),
- "AvroIO.Read/Read.out",
- generateAvroGenericRecords());
- runTestRead("MyRead",
- AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString),
- "MyRead/Read.out",
- generateAvroGenericRecords());
- runTestRead("HerRead",
- AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()),
- "HerRead/Read.out",
- generateAvroGenericRecords());
- }
-
- <T> void runTestWrite(AvroIO.Write.Bound<T> write, String expectedName)
- throws Exception {
- AvroGeneratedUser[] users = generateAvroObjects();
-
- TestPipeline p = TestPipeline.create();
- @SuppressWarnings("unchecked")
- PCollection<T> input = p.apply(Create.of(Arrays.asList((T[]) users))
- .withCoder((Coder<T>) AvroCoder.of(AvroGeneratedUser.class)));
- input.apply(write.withoutSharding());
- p.run();
- assertEquals(expectedName, write.getName());
-
- assertThat(readAvroFile(), containsInAnyOrder(users));
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testWriteFromGeneratedClass() throws Exception {
- runTestWrite(
- AvroIO.Write.to(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
- "AvroIO.Write");
- runTestWrite(
- AvroIO.Write.withSchema(AvroGeneratedUser.class).to(avroFile.getPath()),
- "AvroIO.Write");
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testWriteFromSchema() throws Exception {
- runTestWrite(
- AvroIO.Write.to(avroFile.getPath()).withSchema(schema),
- "AvroIO.Write");
- runTestWrite(
- AvroIO.Write.withSchema(schema).to(avroFile.getPath()),
- "AvroIO.Write");
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testWriteFromSchemaString() throws Exception {
- runTestWrite(
- AvroIO.Write.to(avroFile.getPath()).withSchema(schemaString),
- "AvroIO.Write");
- runTestWrite(
- AvroIO.Write.withSchema(schemaString).to(avroFile.getPath()),
- "AvroIO.Write");
- }
-
- // TODO: for Write only, test withSuffix, withNumShards,
- // withShardNameTemplate and withoutSharding.
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b538574b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
new file mode 100644
index 0000000..cc1b3ad
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Suite;
+
+/**
+ * A test suite for {@link AvroIO.Write} and {@link AvroIO.Read} transforms.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ AvroIOTransformTest.AvroIOReadTransformTest.class,
+ AvroIOTransformTest.AvroIOWriteTransformTest.class
+})
+public class AvroIOTransformTest {
+
+ // TODO: Stop requiring local files
+
+ @Rule
+ public final transient TestPipeline pipeline = TestPipeline.create();
+
+ @Rule
+ public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ private static final Schema.Parser parser = new Schema.Parser();
+
+ private static final String SCHEMA_STRING =
+ "{\"namespace\": \"example.avro\",\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"AvroGeneratedUser\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"},\n"
+ + " {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n"
+ + " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n"
+ + " ]\n"
+ + "}";
+
+ private static final Schema SCHEMA = parser.parse(SCHEMA_STRING);
+
+ private static AvroGeneratedUser[] generateAvroObjects() {
+ final AvroGeneratedUser user1 = new AvroGeneratedUser();
+ user1.setName("Bob");
+ user1.setFavoriteNumber(256);
+
+ final AvroGeneratedUser user2 = new AvroGeneratedUser();
+ user2.setName("Alice");
+ user2.setFavoriteNumber(128);
+
+ final AvroGeneratedUser user3 = new AvroGeneratedUser();
+ user3.setName("Ted");
+ user3.setFavoriteColor("white");
+
+ return new AvroGeneratedUser[] { user1, user2, user3 };
+ }
+
+ /**
+ * Tests for AvroIO Read transforms, using classes generated from {@code user.avsc}.
+ */
+ @RunWith(Parameterized.class)
+ public static class AvroIOReadTransformTest extends AvroIOTransformTest {
+
+ private static GenericRecord[] generateAvroGenericRecords() {
+ final GenericRecord user1 = new GenericData.Record(SCHEMA);
+ user1.put("name", "Bob");
+ user1.put("favorite_number", 256);
+
+ final GenericRecord user2 = new GenericData.Record(SCHEMA);
+ user2.put("name", "Alice");
+ user2.put("favorite_number", 128);
+
+ final GenericRecord user3 = new GenericData.Record(SCHEMA);
+ user3.put("name", "Ted");
+ user3.put("favorite_color", "white");
+
+ return new GenericRecord[] { user1, user2, user3 };
+ }
+
+ private void generateAvroFile(final AvroGeneratedUser[] elements,
+ final File avroFile) throws IOException {
+ final DatumWriter<AvroGeneratedUser> userDatumWriter =
+ new SpecificDatumWriter<>(AvroGeneratedUser.class);
+ try (DataFileWriter<AvroGeneratedUser> dataFileWriter =
+ new DataFileWriter<>(userDatumWriter)) {
+ dataFileWriter.create(elements[0].getSchema(), avroFile);
+ for (final AvroGeneratedUser user : elements) {
+ dataFileWriter.append(user);
+ }
+ }
+ }
+
+ private <T> void runTestRead(final String applyName,
+ final AvroIO.Read.Bound<T> readBuilder,
+ final String expectedName,
+ final T[] expectedOutput) throws Exception {
+
+ final File avroFile = tmpFolder.newFile("file.avro");
+ generateAvroFile(generateAvroObjects(), avroFile);
+ final AvroIO.Read.Bound<T> read = readBuilder.from(avroFile.getPath());
+ final PCollection<T> output =
+ applyName.equals("") ? pipeline.apply(read) : pipeline.apply(applyName, read);
+
+ PAssert.that(output).containsInAnyOrder(expectedOutput);
+
+ pipeline.run();
+
+ assertEquals(expectedName, output.getName());
+ }
+
+ @Parameterized.Parameters(name = "{2}_with_{4}")
+ public static Iterable<Object[]> data() throws IOException {
+
+ final String generatedClass = "GeneratedClass";
+ final String fromSchema = "SchemaObject";
+ final String fromSchemaString = "SchemaString";
+
+ return
+ ImmutableList.<Object[]>builder()
+ .add(
+ new Object[] {
+ "",
+ AvroIO.Read.withSchema(AvroGeneratedUser.class),
+ "AvroIO.Read/Read.out",
+ generateAvroObjects(),
+ generatedClass
+ },
+ new Object[] {
+ "MyRead",
+ AvroIO.Read.withSchema(AvroGeneratedUser.class),
+ "MyRead/Read.out",
+ generateAvroObjects(),
+ generatedClass,
+ },
+ new Object[] {
+ "HerRead",
+ AvroIO.Read.withSchema(AvroGeneratedUser.class),
+ "HerRead/Read.out",
+ generateAvroObjects(),
+ generatedClass,
+ }, new Object[] {
+ "",
+ AvroIO.Read.withSchema(SCHEMA),
+ "AvroIO.Read/Read.out",
+ generateAvroGenericRecords(),
+ fromSchema
+ },
+ new Object[] {
+ "MyRead",
+ AvroIO.Read.withSchema(SCHEMA),
+ "MyRead/Read.out",
+ generateAvroGenericRecords(),
+ fromSchema
+ },
+ new Object[] {
+ "HerRead",
+ AvroIO.Read.withSchema(SCHEMA),
+ "HerRead/Read.out",
+ generateAvroGenericRecords(),
+ fromSchema
+ },
+ new Object[] {
+ "",
+ AvroIO.Read.withSchema(SCHEMA_STRING),
+ "AvroIO.Read/Read.out",
+ generateAvroGenericRecords(),
+ fromSchemaString
+ },
+ new Object[] {
+ "MyRead",
+ AvroIO.Read.withSchema(SCHEMA_STRING),
+ "MyRead/Read.out",
+ generateAvroGenericRecords(),
+ fromSchemaString
+ },
+ new Object[] {
+ "HerRead",
+ AvroIO.Read.withSchema(SCHEMA_STRING),
+ "HerRead/Read.out",
+ generateAvroGenericRecords(),
+ fromSchemaString
+ })
+ .build();
+ }
+
+ @Parameterized.Parameter()
+ public String transformName;
+
+ @Parameterized.Parameter(1)
+ public AvroIO.Read.Bound readTransform;
+
+ @Parameterized.Parameter(2)
+ public String expectedReadTransformName;
+
+ @Parameterized.Parameter(3)
+ public Object[] expectedOutput;
+
+ @Parameterized.Parameter(4)
+ public String testAlias;
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testRead() throws Exception {
+ runTestRead(transformName, readTransform, expectedReadTransformName, expectedOutput);
+ }
+ }
+
+ /**
+ * Tests for AvroIO Write transforms, using classes generated from {@code user.avsc}.
+ */
+ @RunWith(Parameterized.class)
+ public static class AvroIOWriteTransformTest extends AvroIOTransformTest {
+
+ private static final String WRITE_TRANSFORM_NAME = "AvroIO.Write";
+
+ private List<AvroGeneratedUser> readAvroFile(final File avroFile) throws IOException {
+ final DatumReader<AvroGeneratedUser> userDatumReader =
+ new SpecificDatumReader<>(AvroGeneratedUser.class);
+ final List<AvroGeneratedUser> users = new ArrayList<>();
+ try (DataFileReader<AvroGeneratedUser> dataFileReader =
+ new DataFileReader<>(avroFile, userDatumReader)) {
+ while (dataFileReader.hasNext()) {
+ users.add(dataFileReader.next());
+ }
+ }
+ return users;
+ }
+
+ @Parameterized.Parameters(name = "{0}_with_{1}")
+ public static Iterable<Object[]> data() throws IOException {
+
+ final String generatedClass = "GeneratedClass";
+ final String fromSchema = "SchemaObject";
+ final String fromSchemaString = "SchemaString";
+
+ return
+ ImmutableList.<Object[]>builder()
+ .add(
+ new Object[] {
+ AvroIO.Write.withSchema(AvroGeneratedUser.class),
+ generatedClass
+ },
+ new Object[] {
+ AvroIO.Write.withSchema(SCHEMA),
+ fromSchema
+ },
+
+ new Object[] {
+ AvroIO.Write.withSchema(SCHEMA_STRING),
+ fromSchemaString
+ })
+ .build();
+ }
+
+ @Parameterized.Parameter()
+ public AvroIO.Write.Bound writeTransform;
+
+ @Parameterized.Parameter(1)
+ public String testAlias;
+
+ private <T> void runTestWrite(final AvroIO.Write.Bound<T> writeBuilder)
+ throws Exception {
+
+ final File avroFile = tmpFolder.newFile("file.avro");
+ final AvroGeneratedUser[] users = generateAvroObjects();
+ final AvroIO.Write.Bound<T> write = writeBuilder.to(avroFile.getPath());
+
+ @SuppressWarnings("unchecked") final
+ PCollection<T> input =
+ pipeline.apply(Create.of(Arrays.asList((T[]) users))
+ .withCoder((Coder<T>) AvroCoder.of(AvroGeneratedUser.class)));
+ input.apply(write.withoutSharding());
+
+ pipeline.run();
+
+ assertEquals(WRITE_TRANSFORM_NAME, write.getName());
+ assertThat(readAvroFile(avroFile), containsInAnyOrder(users));
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testWrite() throws Exception {
+ runTestWrite(writeTransform);
+ }
+
+ // TODO: for Write only, test withSuffix, withNumShards,
+ // withShardNameTemplate and withoutSharding.
+ }
+}