You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/02 01:46:01 UTC
[06/11] beam git commit: Removes AvroIO.Read.Bound
Removes AvroIO.Read.Bound
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1499d256
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1499d256
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1499d256
Branch: refs/heads/master
Commit: 1499d256c616e34b4416fa202a45aa256ac88d20
Parents: 0166e19
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 18:19:21 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700
----------------------------------------------------------------------
.../beam/runners/spark/io/AvroPipelineTest.java | 2 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 222 +++++++------------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 24 +-
.../apache/beam/sdk/io/AvroIOTransformTest.java | 18 +-
4 files changed, 108 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 2a73c28..e3a44d2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -74,7 +74,7 @@ public class AvroPipelineTest {
Pipeline p = pipelineRule.createPipeline();
PCollection<GenericRecord> input = p.apply(
- AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
+ AvroIO.read().from(inputFile.getAbsolutePath()).withSchema(schema));
input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
p.run().waitUntilFinish();
http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 75e14d5..abde9cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -63,7 +63,7 @@ import org.apache.beam.sdk.values.PDone;
*
* // A simple Read of a local file (only runs locally):
* PCollection<AvroAutoGenClass> records =
- * p.apply(AvroIO.Read.from("/path/to/file.avro")
+ * p.apply(AvroIO.read().from("/path/to/file.avro")
* .withSchema(AvroAutoGenClass.class));
*
* // A Read from a GCS file (runs locally and using remote execution):
@@ -125,15 +125,39 @@ import org.apache.beam.sdk.values.PDone;
*/
public class AvroIO {
/**
- * A root {@link PTransform} that reads from an Avro file (or multiple Avro
- * files matching a pattern) and returns a {@link PCollection} containing
- * the decoding of each record.
+ * Reads records of the given type from an Avro file (or multiple Avro files matching a pattern).
+ *
+ * <p>The schema must be specified using one of the {@code withSchema} functions.
*/
- public static class Read {
+ public static <T> Read<T> read() {
+ return new Read<>();
+ }
+
+ /** Implementation of {@link #read}. */
+ public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+ /** The filepattern to read from. */
+ @Nullable
+ final String filepattern;
+ /** The class type of the records. */
+ @Nullable
+ final Class<T> type;
+ /** The schema of the input file. */
+ @Nullable
+ final Schema schema;
+
+ Read() {
+ this(null, null, null, null);
+ }
+
+ Read(String name, String filepattern, Class<T> type, Schema schema) {
+ super(name);
+ this.filepattern = filepattern;
+ this.type = type;
+ this.schema = schema;
+ }
/**
- * Returns a {@link PTransform} that reads from the file(s)
- * with the given name or pattern. This can be a local filename
+ * Reads from the file(s) with the given name or pattern. This can be a local filename
* or filename pattern (if running locally), or a Google Cloud
* Storage filename or filename pattern of the form
* {@code "gs://<bucket>/<filepath>"} (if running locally or
@@ -141,162 +165,82 @@ public class AvroIO {
* <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html">Java
* Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
*/
- public static Bound<GenericRecord> from(String filepattern) {
- return new Bound<>(GenericRecord.class).from(filepattern);
+ public Read<T> from(String filepattern) {
+ return new Read<>(name, filepattern, type, schema);
}
/**
- * Returns a {@link PTransform} that reads Avro file(s)
- * containing records whose type is the specified Avro-generated class.
- *
- * @param <T> the type of the decoded elements, and the elements
- * of the resulting {@link PCollection}
+ * Returns a new {@link PTransform} that's like this one but
+ * that reads Avro file(s) containing records whose type is the
+ * specified Avro-generated class.
*/
- public static <T> Bound<T> withSchema(Class<T> type) {
- return new Bound<>(type).withSchema(type);
+ public Read<T> withSchema(Class<T> type) {
+ return new Read<>(name, filepattern, type, ReflectData.get().getSchema(type));
}
/**
- * Returns a {@link PTransform} that reads Avro file(s)
- * containing records of the specified schema.
+ * Returns a new {@link PTransform} that's like this one but
+ * that reads Avro file(s) containing records of the specified schema.
*/
- public static Bound<GenericRecord> withSchema(Schema schema) {
- return new Bound<>(GenericRecord.class).withSchema(schema);
+ public Read<GenericRecord> withSchema(Schema schema) {
+ return new Read<>(name, filepattern, GenericRecord.class, schema);
}
/**
- * Returns a {@link PTransform} that reads Avro file(s)
- * containing records of the specified schema in a JSON-encoded
- * string form.
+ * Returns a new {@link PTransform} that's like this one but
+ * that reads Avro file(s) containing records of the specified schema
+ * in a JSON-encoded string form.
+ *
+ * <p>Does not modify this object.
*/
- public static Bound<GenericRecord> withSchema(String schema) {
+ public Read<GenericRecord> withSchema(String schema) {
return withSchema((new Schema.Parser()).parse(schema));
}
- /**
- * A {@link PTransform} that reads from an Avro file (or multiple Avro
- * files matching a pattern) and returns a bounded {@link PCollection} containing
- * the decoding of each record.
- *
- * @param <T> the type of each of the elements of the resulting
- * PCollection
- */
- public static class Bound<T> extends PTransform<PBegin, PCollection<T>> {
- /** The filepattern to read from. */
- @Nullable
- final String filepattern;
- /** The class type of the records. */
- final Class<T> type;
- /** The schema of the input file. */
- @Nullable
- final Schema schema;
-
- Bound(Class<T> type) {
- this(null, null, type, null);
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ if (filepattern == null) {
+ throw new IllegalStateException(
+ "need to set the filepattern of an AvroIO.Read transform");
}
-
- Bound(String name, String filepattern, Class<T> type, Schema schema) {
- super(name);
- this.filepattern = filepattern;
- this.type = type;
- this.schema = schema;
+ if (schema == null) {
+ throw new IllegalStateException("need to set the schema of an AvroIO.Read transform");
}
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that reads from the file(s) with the given name or pattern.
- * (See {@link AvroIO.Read#from} for a description of
- * filepatterns.)
- *
- * <p>Does not modify this object.
- */
- public Bound<T> from(String filepattern) {
- return new Bound<>(name, filepattern, type, schema);
- }
+ @SuppressWarnings("unchecked")
+ Bounded<T> read =
+ type == GenericRecord.class
+ ? (Bounded<T>) org.apache.beam.sdk.io.Read.from(
+ AvroSource.from(filepattern).withSchema(schema))
+ : org.apache.beam.sdk.io.Read.from(
+ AvroSource.from(filepattern).withSchema(type));
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that reads Avro file(s) containing records whose type is the
- * specified Avro-generated class.
- *
- * <p>Does not modify this object.
- *
- * @param <X> the type of the decoded elements and the elements of
- * the resulting PCollection
- */
- public <X> Bound<X> withSchema(Class<X> type) {
- return new Bound<>(name, filepattern, type, ReflectData.get().getSchema(type));
- }
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that reads Avro file(s) containing records of the specified schema.
- *
- * <p>Does not modify this object.
- */
- public Bound<GenericRecord> withSchema(Schema schema) {
- return new Bound<>(name, filepattern, GenericRecord.class, schema);
- }
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that reads Avro file(s) containing records of the specified schema
- * in a JSON-encoded string form.
- *
- * <p>Does not modify this object.
- */
- public Bound<GenericRecord> withSchema(String schema) {
- return withSchema((new Schema.Parser()).parse(schema));
- }
-
- @Override
- public PCollection<T> expand(PBegin input) {
- if (filepattern == null) {
- throw new IllegalStateException(
- "need to set the filepattern of an AvroIO.Read transform");
- }
- if (schema == null) {
- throw new IllegalStateException("need to set the schema of an AvroIO.Read transform");
- }
-
- @SuppressWarnings("unchecked")
- Bounded<T> read =
- type == GenericRecord.class
- ? (Bounded<T>) org.apache.beam.sdk.io.Read.from(
- AvroSource.from(filepattern).withSchema(schema))
- : org.apache.beam.sdk.io.Read.from(
- AvroSource.from(filepattern).withSchema(type));
-
- PCollection<T> pcol = input.getPipeline().apply("Read", read);
- // Honor the default output coder that would have been used by this PTransform.
- pcol.setCoder(getDefaultOutputCoder());
- return pcol;
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("filePattern", filepattern)
- .withLabel("Input File Pattern"));
- }
+ PCollection<T> pcol = input.getPipeline().apply("Read", read);
+ // Honor the default output coder that would have been used by this PTransform.
+ pcol.setCoder(getDefaultOutputCoder());
+ return pcol;
+ }
- @Override
- protected Coder<T> getDefaultOutputCoder() {
- return AvroCoder.of(type, schema);
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(DisplayData.item("filePattern", filepattern)
+ .withLabel("Input File Pattern"));
+ }
- public String getFilepattern() {
- return filepattern;
- }
+ @Override
+ protected Coder<T> getDefaultOutputCoder() {
+ return AvroCoder.of(type, schema);
+ }
- public Schema getSchema() {
- return schema;
- }
+ public String getFilepattern() {
+ return filepattern;
}
- /** Disallow construction of utility class. */
- private Read() {}
+ public Schema getSchema() {
+ return schema;
+ }
}
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index ece7997..6d842b3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -103,7 +103,7 @@ public class AvroIOTest {
@Test
public void testAvroIOGetName() {
- assertEquals("AvroIO.Read", AvroIO.Read.from("gs://bucket/foo*/baz").getName());
+ assertEquals("AvroIO.Read", AvroIO.read().from("gs://bucket/foo*/baz").getName());
assertEquals("AvroIO.Write", AvroIO.Write.to("gs://bucket/foo/baz").getName());
}
@@ -150,8 +150,11 @@ public class AvroIOTest {
.withSchema(GenericClass.class));
p.run();
- PCollection<GenericClass> input = p
- .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClass.class));
+ PCollection<GenericClass> input =
+ p.apply(
+ AvroIO.<GenericClass>read()
+ .from(outputFile.getAbsolutePath())
+ .withSchema(GenericClass.class));
PAssert.that(input).containsInAnyOrder(values);
p.run();
@@ -173,7 +176,7 @@ public class AvroIOTest {
p.run();
PCollection<GenericClass> input = p
- .apply(AvroIO.Read
+ .apply(AvroIO.<GenericClass>read()
.from(outputFile.getAbsolutePath())
.withSchema(GenericClass.class));
@@ -200,7 +203,7 @@ public class AvroIOTest {
p.run();
PCollection<GenericClass> input = p
- .apply(AvroIO.Read
+ .apply(AvroIO.<GenericClass>read()
.from(outputFile.getAbsolutePath())
.withSchema(GenericClass.class));
@@ -269,8 +272,11 @@ public class AvroIOTest {
List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null),
new GenericClassV2(5, "bar", null));
- PCollection<GenericClassV2> input = p
- .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClassV2.class));
+ PCollection<GenericClassV2> input =
+ p.apply(
+ AvroIO.<GenericClassV2>read()
+ .from(outputFile.getAbsolutePath())
+ .withSchema(GenericClassV2.class));
PAssert.that(input).containsInAnyOrder(expected);
p.run();
@@ -533,7 +539,7 @@ public class AvroIOTest {
@Test
public void testReadDisplayData() {
- AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*");
+ AvroIO.Read<?> read = AvroIO.read().from("foo.*");
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
@@ -544,7 +550,7 @@ public class AvroIOTest {
public void testPrimitiveReadDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*")
+ AvroIO.Read<?> read = AvroIO.read().from("foo.*")
.withSchema(Schema.create(Schema.Type.STRING));
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index 3cf52a4..06b9841 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -138,13 +138,13 @@ public class AvroIOTransformTest {
}
private <T> void runTestRead(@Nullable final String applyName,
- final AvroIO.Read.Bound<T> readBuilder,
+ final AvroIO.Read<T> readBuilder,
final String expectedName,
final T[] expectedOutput) throws Exception {
final File avroFile = tmpFolder.newFile("file.avro");
generateAvroFile(generateAvroObjects(), avroFile);
- final AvroIO.Read.Bound<T> read = readBuilder.from(avroFile.getPath());
+ final AvroIO.Read<T> read = readBuilder.from(avroFile.getPath());
final PCollection<T> output =
applyName == null ? pipeline.apply(read) : pipeline.apply(applyName, read);
@@ -169,14 +169,14 @@ public class AvroIOTransformTest {
// test read using generated class
new Object[] {
null,
- AvroIO.Read.withSchema(AvroGeneratedUser.class),
+ AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class),
"AvroIO.Read/Read.out",
generateAvroObjects(),
generatedClass
},
new Object[] {
"MyRead",
- AvroIO.Read.withSchema(AvroGeneratedUser.class),
+ AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class),
"MyRead/Read.out",
generateAvroObjects(),
generatedClass
@@ -185,14 +185,14 @@ public class AvroIOTransformTest {
// test read using schema object
new Object[] {
null,
- AvroIO.Read.withSchema(SCHEMA),
+ AvroIO.read().withSchema(SCHEMA),
"AvroIO.Read/Read.out",
generateAvroGenericRecords(),
fromSchema
},
new Object[] {
"MyRead",
- AvroIO.Read.withSchema(SCHEMA),
+ AvroIO.read().withSchema(SCHEMA),
"MyRead/Read.out",
generateAvroGenericRecords(),
fromSchema
@@ -201,14 +201,14 @@ public class AvroIOTransformTest {
// test read using schema string
new Object[] {
null,
- AvroIO.Read.withSchema(SCHEMA_STRING),
+ AvroIO.read().withSchema(SCHEMA_STRING),
"AvroIO.Read/Read.out",
generateAvroGenericRecords(),
fromSchemaString
},
new Object[] {
"MyRead",
- AvroIO.Read.withSchema(SCHEMA_STRING),
+ AvroIO.read().withSchema(SCHEMA_STRING),
"MyRead/Read.out",
generateAvroGenericRecords(),
fromSchemaString
@@ -221,7 +221,7 @@ public class AvroIOTransformTest {
public String transformName;
@Parameterized.Parameter(1)
- public AvroIO.Read.Bound readTransform;
+ public AvroIO.Read readTransform;
@Parameterized.Parameter(2)
public String expectedReadTransformName;