You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/02 01:46:05 UTC
[10/11] beam git commit: Moves AvroIO.Read.withSchema into read()
Moves AvroIO.Read.withSchema into read()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/abb4916c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/abb4916c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/abb4916c
Branch: refs/heads/master
Commit: abb4916ce2fa8d4a5caf783b66cc5541053ea83c
Parents: d1dfd4e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 19:03:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 35 ++++++++------------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 24 ++++++--------
.../apache/beam/sdk/io/AvroIOTransformTest.java | 4 +--
3 files changed, 25 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/abb4916c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 4bde6ec..08fc8a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -46,16 +46,15 @@ import org.apache.beam.sdk.values.PDone;
* {@link PTransform}s for reading and writing Avro files.
*
* <p>To read a {@link PCollection} from one or more Avro files, use
- * {@link AvroIO.Read}, specifying {@link AvroIO.Read#from} to specify
+ * {@code AvroIO.read()}, specifying {@link AvroIO.Read#from} to specify
* the path of the file(s) to read from (e.g., a local filename or
* filename pattern if running locally, or a Google Cloud Storage
* filename or filename pattern of the form {@code "gs://<bucket>/<filepath>"}).
*
- * <p>It is required to specify {@link AvroIO.Read#withSchema}. To
- * read specific records, such as Avro-generated classes, provide an
- * Avro-generated class type. To read {@link GenericRecord GenericRecords}, provide either
- * a {@link Schema} object or an Avro schema in a JSON-encoded string form.
- * An exception will be thrown if a record doesn't match the specified
+ * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}.
+ * To read {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes
+ * a {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
* schema.
*
* <p>For example:
@@ -64,15 +63,13 @@ import org.apache.beam.sdk.values.PDone;
*
* // A simple Read of a local file (only runs locally):
* PCollection<AvroAutoGenClass> records =
- * p.apply(AvroIO.read().from("/path/to/file.avro")
- * .withSchema(AvroAutoGenClass.class));
+ * p.apply(AvroIO.read(AvroAutoGenClass.class).from("/path/to/file.avro"));
*
* // A Read from a GCS file (runs locally and using remote execution):
* Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
* PCollection<GenericRecord> records =
- * p.apply(AvroIO.read()
- * .from("gs://my_bucket/path/to/records-*.avro")
- * .withSchema(schema));
+ * p.apply(AvroIO.readGenericRecords(schema)
+ * .from("gs://my_bucket/path/to/records-*.avro"));
* } </pre>
*
* <p>To write a {@link PCollection} to one or more Avro files, use
@@ -130,8 +127,11 @@ public class AvroIO {
*
* <p>The schema must be specified using one of the {@code withSchema} functions.
*/
- public static <T> Read<T> read() {
- return new AutoValue_AvroIO_Read.Builder<T>().build();
+ public static <T> Read<T> read(Class<T> recordClass) {
+ return new AutoValue_AvroIO_Read.Builder<T>()
+ .setRecordClass(recordClass)
+ .setSchema(ReflectData.get().getSchema(recordClass))
+ .build();
}
/** Reads Avro file(s) containing records of the specified schema. */
@@ -188,15 +188,6 @@ public class AvroIO {
return toBuilder().setFilepattern(filepattern).build();
}
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that reads Avro file(s) containing records whose type is the
- * specified Avro-generated class.
- */
- public Read<T> withSchema(Class<T> type) {
- return toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build();
- }
-
@Override
public PCollection<T> expand(PBegin input) {
if (getFilepattern() == null) {
http://git-wip-us.apache.org/repos/asf/beam/blob/abb4916c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 7df1b18..38984b5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -102,7 +102,7 @@ public class AvroIOTest {
@Test
public void testAvroIOGetName() {
- assertEquals("AvroIO.Read", AvroIO.read().from("gs://bucket/foo*/baz").getName());
+ assertEquals("AvroIO.Read", AvroIO.read(String.class).from("gs://bucket/foo*/baz").getName());
assertEquals("AvroIO.Write", AvroIO.write().to("gs://bucket/foo/baz").getName());
}
@@ -151,9 +151,8 @@ public class AvroIOTest {
PCollection<GenericClass> input =
p.apply(
- AvroIO.<GenericClass>read()
- .from(outputFile.getAbsolutePath())
- .withSchema(GenericClass.class));
+ AvroIO.read(GenericClass.class)
+ .from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(values);
p.run();
@@ -175,9 +174,8 @@ public class AvroIOTest {
p.run();
PCollection<GenericClass> input = p
- .apply(AvroIO.<GenericClass>read()
- .from(outputFile.getAbsolutePath())
- .withSchema(GenericClass.class));
+ .apply(AvroIO.read(GenericClass.class)
+ .from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(values);
p.run();
@@ -202,9 +200,8 @@ public class AvroIOTest {
p.run();
PCollection<GenericClass> input = p
- .apply(AvroIO.<GenericClass>read()
- .from(outputFile.getAbsolutePath())
- .withSchema(GenericClass.class));
+ .apply(AvroIO.read(GenericClass.class)
+ .from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(values);
p.run();
@@ -273,9 +270,8 @@ public class AvroIOTest {
PCollection<GenericClassV2> input =
p.apply(
- AvroIO.<GenericClassV2>read()
- .from(outputFile.getAbsolutePath())
- .withSchema(GenericClassV2.class));
+ AvroIO.read(GenericClassV2.class)
+ .from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(expected);
p.run();
@@ -535,7 +531,7 @@ public class AvroIOTest {
@Test
public void testReadDisplayData() {
- AvroIO.Read<?> read = AvroIO.read().from("foo.*");
+ AvroIO.Read<?> read = AvroIO.read(String.class).from("foo.*");
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
http://git-wip-us.apache.org/repos/asf/beam/blob/abb4916c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index ba7f1b9..51c9691 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -169,14 +169,14 @@ public class AvroIOTransformTest {
// test read using generated class
new Object[] {
null,
- AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class),
+ AvroIO.read(AvroGeneratedUser.class),
"AvroIO.Read/Read.out",
generateAvroObjects(),
generatedClass
},
new Object[] {
"MyRead",
- AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class),
+ AvroIO.read(AvroGeneratedUser.class),
"MyRead/Read.out",
generateAvroObjects(),
generatedClass