You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/02 01:45:56 UTC
[01/11] beam git commit: Scattered minor improvements per review
comments
Repository: beam
Updated Branches:
refs/heads/master 6d443bc39 -> 034565c68
Scattered minor improvements per review comments
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/caf2faeb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/caf2faeb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/caf2faeb
Branch: refs/heads/master
Commit: caf2faeb5e0b173f4e40f4af70c14d1d5d4244e4
Parents: 27d7462
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon May 1 17:00:46 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 81 +++++++++-----------
.../java/org/apache/beam/sdk/io/AvroSink.java | 14 +---
.../java/org/apache/beam/sdk/io/AvroSource.java | 4 +-
.../beam/sdk/testing/SourceTestUtils.java | 5 +-
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 44 ++++++-----
5 files changed, 69 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 6b66a98..755cdb9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -33,6 +33,7 @@ import org.apache.avro.reflect.ReflectData;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.Read.Bounded;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
@@ -45,11 +46,9 @@ import org.apache.beam.sdk.values.PDone;
/**
* {@link PTransform}s for reading and writing Avro files.
*
- * <p>To read a {@link PCollection} from one or more Avro files, use
- * {@code AvroIO.read()}, specifying {@link AvroIO.Read#from} to specify
- * the path of the file(s) to read from (e.g., a local filename or
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or filename pattern of the form {@code "gs://<bucket>/<filepath>"}).
+ * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()},
+ * specifying {@link AvroIO.Read#from} to specify the filename or filepattern to read from.
+ * See {@link FileSystems} for information on supported file systems and filepatterns.
*
* <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}.
* To read {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes
@@ -72,13 +71,12 @@ import org.apache.beam.sdk.values.PDone;
* .from("gs://my_bucket/path/to/records-*.avro"));
* } </pre>
*
- * <p>To write a {@link PCollection} to one or more Avro files, use
- * {@link AvroIO.Write}, specifying {@code AvroIO.write().to(String)} to specify
- * the path of the file to write to (e.g., a local filename or sharded
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). {@code AvroIO.write().to(FileBasedSink.FilenamePolicy)}
- * can also be used to specify a custom file naming policy.
+ * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, specifying
+ * {@code AvroIO.write().to(String)} to specify the filename or sharded filepattern to write to.
+ * See {@link FileSystems} for information on supported file systems and {@link ShardNameTemplate}
+ * for information on naming of output files. You can also use {@code AvroIO.write()} with
+ * {@link Write#to(FileBasedSink.FilenamePolicy)} to
+ * specify a custom file naming policy.
*
* <p>By default, all input is put into the global window before writing. If per-window writes are
* desired - for example, when using a streaming runner -
@@ -140,7 +138,8 @@ public class AvroIO {
}
/**
- * Like {@link #readGenericRecords(Schema)} but the schema is specified as a JSON-encoded string.
+ * Reads Avro file(s) containing records of the specified schema. The schema is specified as a
+ * JSON-encoded string.
*/
public static Read<GenericRecord> readGenericRecords(String schema) {
return readGenericRecords(new Schema.Parser().parse(schema));
@@ -165,6 +164,13 @@ public class AvroIO {
.build();
}
+ /**
+ * Writes Avro records of the specified schema. The schema is specified as a JSON-encoded string.
+ */
+ public static Write<GenericRecord> writeGenericRecords(String schema) {
+ return writeGenericRecords(new Schema.Parser().parse(schema));
+ }
+
private static <T> Write.Builder<T> defaultWriteBuilder() {
return new AutoValue_AvroIO_Write.Builder<T>()
.setFilenameSuffix("")
@@ -175,13 +181,6 @@ public class AvroIO {
.setWindowedWrites(false);
}
- /**
- * Like {@link #writeGenericRecords(Schema)} but the schema is specified as a JSON-encoded string.
- */
- public static Write<GenericRecord> writeGenericRecords(String schema) {
- return writeGenericRecords(new Schema.Parser().parse(schema));
- }
-
/** Implementation of {@link #read}. */
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@@ -200,15 +199,7 @@ public class AvroIO {
abstract Read<T> build();
}
- /**
- * Reads from the file(s) with the given name or pattern. This can be a local filename
- * or filename pattern (if running locally), or a Google Cloud
- * Storage filename or filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"} (if running locally or
- * using remote execution). Standard
- * <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html">Java
- * Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
- */
+ /** Reads from the given filename or filepattern. */
public Read<T> from(String filepattern) {
return toBuilder().setFilepattern(filepattern).build();
}
@@ -275,7 +266,7 @@ public class AvroIO {
abstract Class<T> getRecordClass();
@Nullable abstract Schema getSchema();
abstract boolean getWindowedWrites();
- @Nullable abstract FileBasedSink.FilenamePolicy getFilenamePolicy();
+ @Nullable abstract FilenamePolicy getFilenamePolicy();
/**
* The codec used to encode the blocks in the Avro file. String value drawn from those in
* https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
@@ -295,7 +286,7 @@ public class AvroIO {
abstract Builder<T> setRecordClass(Class<T> recordClass);
abstract Builder<T> setSchema(Schema schema);
abstract Builder<T> setWindowedWrites(boolean windowedWrites);
- abstract Builder<T> setFilenamePolicy(FileBasedSink.FilenamePolicy filenamePolicy);
+ abstract Builder<T> setFilenamePolicy(FilenamePolicy filenamePolicy);
abstract Builder<T> setCodec(SerializableAvroCodecFactory codec);
abstract Builder<T> setMetadata(ImmutableMap<String, Object> metadata);
@@ -303,10 +294,8 @@ public class AvroIO {
}
/**
- * Writes to the file(s) with the given prefix. This can be a local filename
- * (if running locally), or a Google Cloud Storage filename of
- * the form {@code "gs://<bucket>/<filepath>"}
- * (if running locally or using remote execution).
+ * Writes to the file(s) with the given prefix. See {@link FileSystems} for information on
+ * supported file systems.
*
* <p>The files written will begin with this prefix, followed by
* a shard identifier (see {@link #withNumShards}, and end
@@ -318,7 +307,7 @@ public class AvroIO {
}
/** Writes to the file(s) specified by the provided {@link FileBasedSink.FilenamePolicy}. */
- public Write<T> to(FileBasedSink.FilenamePolicy filenamePolicy) {
+ public Write<T> to(FilenamePolicy filenamePolicy) {
return toBuilder().setFilenamePolicy(filenamePolicy).build();
}
@@ -333,7 +322,8 @@ public class AvroIO {
}
/**
- * Uses the provided shard count.
+ * Uses the provided shard count. See {@link ShardNameTemplate} for a description of shard
+ * templates.
*
* <p>Constraining the number of shards is likely to reduce
* the performance of a pipeline. Setting this value is not recommended
@@ -341,19 +331,13 @@ public class AvroIO {
*
* @param numShards the number of shards to use, or 0 to let the system
* decide.
- * @see ShardNameTemplate
*/
public Write<T> withNumShards(int numShards) {
checkArgument(numShards >= 0);
return toBuilder().setNumShards(numShards).build();
}
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that uses the given shard name template.
- *
- * @see ShardNameTemplate
- */
+ /** Uses the given {@link ShardNameTemplate} for naming output files. */
public Write<T> withShardNameTemplate(String shardTemplate) {
return toBuilder().setShardTemplate(shardTemplate).build();
}
@@ -361,12 +345,19 @@ public class AvroIO {
/**
* Forces a single file as output.
*
- * <p>This is a shortcut for {@code .withNumShards(1).withShardNameTemplate("")}
+ * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
*/
public Write<T> withoutSharding() {
return withNumShards(1).withShardNameTemplate("");
}
+ /**
+ * Preserves windowing of input elements and writes them to files based on the element's window.
+ *
+ * <p>Requires use of {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated
+ * using {@link FilenamePolicy#windowedFilename(FileBasedSink.FilenamePolicy.WindowedContext)}.
+ * See also {@link WriteFiles#withWindowedWrites()}.
+ */
public Write<T> withWindowedWrites() {
return toBuilder().setWindowedWrites(true).build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index 16f233c..46bb4f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -30,9 +30,7 @@ import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.MimeTypes;
-/**
- * A {@link FileBasedSink} for Avro files.
- */
+/** A {@link FileBasedSink} for Avro files. */
class AvroSink<T> extends FileBasedSink<T> {
private final AvroCoder<T> coder;
private final SerializableAvroCodecFactory codec;
@@ -67,10 +65,7 @@ class AvroSink<T> extends FileBasedSink<T> {
return new AvroWriteOperation<>(this, coder, codec, metadata);
}
- /**
- * A {@link FileBasedWriteOperation
- * FileBasedWriteOperation} for Avro files.
- */
+ /** A {@link FileBasedWriteOperation FileBasedWriteOperation} for Avro files. */
private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> {
private final AvroCoder<T> coder;
private final SerializableAvroCodecFactory codec;
@@ -92,10 +87,7 @@ class AvroSink<T> extends FileBasedSink<T> {
}
}
- /**
- * A {@link FileBasedWriter FileBasedWriter}
- * for Avro files.
- */
+ /** A {@link FileBasedWriter FileBasedWriter} for Avro files. */
private static class AvroWriter<T> extends FileBasedWriter<T> {
private final AvroCoder<T> coder;
private DataFileWriter<T> dataFileWriter;
http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 58e6555..96d21c6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -62,7 +62,9 @@ import org.apache.commons.compress.utils.CountingInputStream;
// CHECKSTYLE.OFF: JavadocStyle
/**
- * A {@link FileBasedSource} for reading Avro files.
+ * Do not use in pipelines directly: most users should use {@link AvroIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} for reading Avro files.
*
* <p>To read a {@link PCollection} of objects from one or more Avro files, use
* {@link AvroSource#from} to specify the path(s) of the files to read. The {@link AvroSource} that
http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
index fd7ae85..cde0b94 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
@@ -72,9 +72,8 @@ import org.slf4j.LoggerFactory;
* as a heavy-weight stress test including concurrency. We strongly recommend to
* use both.
* </ul>
- * For example usages, see the unit tests of classes such as
- * {@link org.apache.beam.sdk.io.AvroSource} or
- * {@link org.apache.beam.sdk.io.TextIO TextIO.TextSource}.
+ * For example usages, see the unit tests of classes such as {@code AvroSource} or
+ * {@code TextSource}.
*
* <p>Like {@link PAssert}, requires JUnit and Hamcrest to be present in the classpath.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index e421b96..d14d9b2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -45,6 +45,7 @@ import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
@@ -143,8 +144,9 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
- .withoutSharding());
+ .apply(AvroIO.write(GenericClass.class)
+ .to(outputFile.getAbsolutePath())
+ .withoutSharding());
p.run();
PCollection<GenericClass> input =
@@ -165,9 +167,10 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
- .withoutSharding()
- .withCodec(CodecFactory.deflateCodec(9)));
+ .apply(AvroIO.write(GenericClass.class)
+ .to(outputFile.getAbsolutePath())
+ .withoutSharding()
+ .withCodec(CodecFactory.deflateCodec(9)));
p.run();
PCollection<GenericClass> input = p
@@ -190,9 +193,10 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
- .withoutSharding()
- .withCodec(CodecFactory.nullCodec()));
+ .apply(AvroIO.write(GenericClass.class)
+ .to(outputFile.getAbsolutePath())
+ .withoutSharding()
+ .withCodec(CodecFactory.nullCodec()));
p.run();
PCollection<GenericClass> input = p
@@ -256,7 +260,8 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
+ .apply(AvroIO.write(GenericClass.class)
+ .to(outputFile.getAbsolutePath())
.withoutSharding());
p.run();
@@ -362,7 +367,8 @@ public class AvroIOTest {
windowedAvroWritePipeline
.apply(values)
.apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
- .apply(AvroIO.write(GenericClass.class).to(new WindowedFilenamePolicy(outputFilePrefix))
+ .apply(AvroIO.write(GenericClass.class)
+ .to(new WindowedFilenamePolicy(outputFilePrefix))
.withWindowedWrites()
.withNumShards(2));
windowedAvroWritePipeline.run();
@@ -403,7 +409,7 @@ public class AvroIOTest {
@Test
public void testWriteWithCustomCodec() throws Exception {
- AvroIO.Write<?> write = AvroIO.write(String.class)
+ AvroIO.Write<String> write = AvroIO.write(String.class)
.to("gs://bucket/foo/baz")
.withCodec(CodecFactory.snappyCodec());
assertEquals(SNAPPY_CODEC, write.getCodec().toString());
@@ -442,7 +448,8 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
+ .apply(AvroIO.write(GenericClass.class)
+ .to(outputFile.getAbsolutePath())
.withoutSharding()
.withMetadata(ImmutableMap.<String, Object>of(
"stringKey", "stringValue",
@@ -463,8 +470,7 @@ public class AvroIOTest {
File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
String outputFilePrefix = baseOutputFile.getAbsolutePath();
- AvroIO.Write<String> write =
- AvroIO.write(String.class).to(outputFilePrefix);
+ AvroIO.Write<String> write = AvroIO.write(String.class).to(outputFilePrefix);
if (numShards > 1) {
System.out.println("NumShards " + numShards);
write = write.withNumShards(numShards);
@@ -524,7 +530,7 @@ public class AvroIOTest {
@Test
public void testReadDisplayData() {
- AvroIO.Read<?> read = AvroIO.read(String.class).from("foo.*");
+ AvroIO.Read<String> read = AvroIO.read(String.class).from("foo.*");
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
@@ -535,7 +541,7 @@ public class AvroIOTest {
public void testPrimitiveReadDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- AvroIO.Read<?> read =
+ AvroIO.Read<GenericRecord> read =
AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("foo.*");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
@@ -545,7 +551,7 @@ public class AvroIOTest {
@Test
public void testWriteDisplayData() {
- AvroIO.Write<?> write = AvroIO.write(GenericClass.class)
+ AvroIO.Write<GenericClass> write = AvroIO.write(GenericClass.class)
.to("foo")
.withShardNameTemplate("-SS-of-NN-")
.withSuffix("bar")
@@ -572,8 +578,8 @@ public class AvroIOTest {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
- AvroIO.Write<?> write = AvroIO.writeGenericRecords(Schema.create(Schema.Type.STRING))
- .to(outputPath);
+ AvroIO.Write<GenericRecord> write =
+ AvroIO.writeGenericRecords(Schema.create(Schema.Type.STRING)).to(outputPath);
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat("AvroIO.Write should include the file pattern in its primitive transform",
[09/11] beam git commit: Adds AvroIO.readGenericRecords()
Posted by jk...@apache.org.
Adds AvroIO.readGenericRecords()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff7a1d42
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff7a1d42
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff7a1d42
Branch: refs/heads/master
Commit: ff7a1d42f2902bebdf998d3f00b2b268ba150058
Parents: 1499d25
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 18:36:20 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700
----------------------------------------------------------------------
.../beam/runners/spark/io/AvroPipelineTest.java | 2 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 31 ++++++++------------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 8 ++---
.../apache/beam/sdk/io/AvroIOTransformTest.java | 8 ++---
4 files changed, 19 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index e3a44d2..62db14f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -74,7 +74,7 @@ public class AvroPipelineTest {
Pipeline p = pipelineRule.createPipeline();
PCollection<GenericRecord> input = p.apply(
- AvroIO.read().from(inputFile.getAbsolutePath()).withSchema(schema));
+ AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
p.run().waitUntilFinish();
http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index abde9cb..ed172d1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -133,6 +133,18 @@ public class AvroIO {
return new Read<>();
}
+ /** Reads Avro file(s) containing records of the specified schema. */
+ public static Read<GenericRecord> readGenericRecords(Schema schema) {
+ return new Read<>(null, null, GenericRecord.class, schema);
+ }
+
+ /**
+ * Like {@link #readGenericRecords(Schema)} but the schema is specified as a JSON-encoded string.
+ */
+ public static Read<GenericRecord> readGenericRecords(String schema) {
+ return readGenericRecords(new Schema.Parser().parse(schema));
+ }
+
/** Implementation of {@link #read}. */
public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
/** The filepattern to read from. */
@@ -178,25 +190,6 @@ public class AvroIO {
return new Read<>(name, filepattern, type, ReflectData.get().getSchema(type));
}
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that reads Avro file(s) containing records of the specified schema.
- */
- public Read<GenericRecord> withSchema(Schema schema) {
- return new Read<>(name, filepattern, GenericRecord.class, schema);
- }
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that reads Avro file(s) containing records of the specified schema
- * in a JSON-encoded string form.
- *
- * <p>Does not modify this object.
- */
- public Read<GenericRecord> withSchema(String schema) {
- return withSchema((new Schema.Parser()).parse(schema));
- }
-
@Override
public PCollection<T> expand(PBegin input) {
if (filepattern == null) {
http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 6d842b3..2144b0d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -282,10 +282,6 @@ public class AvroIOTest {
p.run();
}
- private TimestampedValue<GenericClass> newValue(GenericClass element, Duration duration) {
- return TimestampedValue.of(element, new Instant(0).plus(duration));
- }
-
private static class WindowedFilenamePolicy extends FilenamePolicy {
String outputFilePrefix;
@@ -550,8 +546,8 @@ public class AvroIOTest {
public void testPrimitiveReadDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- AvroIO.Read<?> read = AvroIO.read().from("foo.*")
- .withSchema(Schema.create(Schema.Type.STRING));
+ AvroIO.Read<?> read =
+ AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("foo.*");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat("AvroIO.Read should include the file pattern in its primitive transform",
http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index 06b9841..b974663 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -185,14 +185,14 @@ public class AvroIOTransformTest {
// test read using schema object
new Object[] {
null,
- AvroIO.read().withSchema(SCHEMA),
+ AvroIO.readGenericRecords(SCHEMA),
"AvroIO.Read/Read.out",
generateAvroGenericRecords(),
fromSchema
},
new Object[] {
"MyRead",
- AvroIO.read().withSchema(SCHEMA),
+ AvroIO.readGenericRecords(SCHEMA),
"MyRead/Read.out",
generateAvroGenericRecords(),
fromSchema
@@ -201,14 +201,14 @@ public class AvroIOTransformTest {
// test read using schema string
new Object[] {
null,
- AvroIO.read().withSchema(SCHEMA_STRING),
+ AvroIO.readGenericRecords(SCHEMA_STRING),
"AvroIO.Read/Read.out",
generateAvroGenericRecords(),
fromSchemaString
},
new Object[] {
"MyRead",
- AvroIO.read().withSchema(SCHEMA_STRING),
+ AvroIO.readGenericRecords(SCHEMA_STRING),
"MyRead/Read.out",
generateAvroGenericRecords(),
fromSchemaString
[04/11] beam git commit: Fixes javadoc of TextIO to not point to
AvroIO
Posted by jk...@apache.org.
Fixes javadoc of TextIO to not point to AvroIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2fa3c348
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2fa3c348
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2fa3c348
Branch: refs/heads/master
Commit: 2fa3c348d5a6aab6a7da7c6c62f6b9254feb13af
Parents: 6d443bc
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sat Apr 29 16:15:24 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700
----------------------------------------------------------------------
sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2fa3c348/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 6b58391..0947702 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -85,9 +85,9 @@ import org.apache.beam.sdk.values.PDone;
*
* <p>By default, all input is put into the global window before writing. If per-window writes are
* desired - for example, when using a streaming runner -
- * {@link AvroIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be
+ * {@link TextIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be
* preserved. When producing windowed writes, the number of output shards must be set explicitly
- * using {@link AvroIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a
+ * using {@link TextIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a
* runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be
* set, and unique windows and triggers must produce unique filenames.
*
[06/11] beam git commit: Removes AvroIO.Read.Bound
Posted by jk...@apache.org.
Removes AvroIO.Read.Bound
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1499d256
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1499d256
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1499d256
Branch: refs/heads/master
Commit: 1499d256c616e34b4416fa202a45aa256ac88d20
Parents: 0166e19
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 18:19:21 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700
----------------------------------------------------------------------
.../beam/runners/spark/io/AvroPipelineTest.java | 2 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 222 +++++++------------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 24 +-
.../apache/beam/sdk/io/AvroIOTransformTest.java | 18 +-
4 files changed, 108 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 2a73c28..e3a44d2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -74,7 +74,7 @@ public class AvroPipelineTest {
Pipeline p = pipelineRule.createPipeline();
PCollection<GenericRecord> input = p.apply(
- AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
+ AvroIO.read().from(inputFile.getAbsolutePath()).withSchema(schema));
input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
p.run().waitUntilFinish();
http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 75e14d5..abde9cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -63,7 +63,7 @@ import org.apache.beam.sdk.values.PDone;
*
* // A simple Read of a local file (only runs locally):
* PCollection<AvroAutoGenClass> records =
- * p.apply(AvroIO.Read.from("/path/to/file.avro")
+ * p.apply(AvroIO.read().from("/path/to/file.avro")
* .withSchema(AvroAutoGenClass.class));
*
* // A Read from a GCS file (runs locally and using remote execution):
@@ -125,15 +125,39 @@ import org.apache.beam.sdk.values.PDone;
*/
public class AvroIO {
/**
- * A root {@link PTransform} that reads from an Avro file (or multiple Avro
- * files matching a pattern) and returns a {@link PCollection} containing
- * the decoding of each record.
+ * Reads records of the given type from an Avro file (or multiple Avro files matching a pattern).
+ *
+ * <p>The schema must be specified using one of the {@code withSchema} functions.
*/
- public static class Read {
+ public static <T> Read<T> read() {
+ return new Read<>();
+ }
+
+ /** Implementation of {@link #read}. */
+ public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+ /** The filepattern to read from. */
+ @Nullable
+ final String filepattern;
+ /** The class type of the records. */
+ @Nullable
+ final Class<T> type;
+ /** The schema of the input file. */
+ @Nullable
+ final Schema schema;
+
+ Read() {
+ this(null, null, null, null);
+ }
+
+ Read(String name, String filepattern, Class<T> type, Schema schema) {
+ super(name);
+ this.filepattern = filepattern;
+ this.type = type;
+ this.schema = schema;
+ }
/**
- * Returns a {@link PTransform} that reads from the file(s)
- * with the given name or pattern. This can be a local filename
+ * Reads from the file(s) with the given name or pattern. This can be a local filename
* or filename pattern (if running locally), or a Google Cloud
* Storage filename or filename pattern of the form
* {@code "gs://<bucket>/<filepath>"} (if running locally or
@@ -141,162 +165,82 @@ public class AvroIO {
* <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html">Java
* Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
*/
- public static Bound<GenericRecord> from(String filepattern) {
- return new Bound<>(GenericRecord.class).from(filepattern);
+ public Read<T> from(String filepattern) {
+ return new Read<>(name, filepattern, type, schema);
}
/**
- * Returns a {@link PTransform} that reads Avro file(s)
- * containing records whose type is the specified Avro-generated class.
- *
- * @param <T> the type of the decoded elements, and the elements
- * of the resulting {@link PCollection}
+ * Returns a new {@link PTransform} that's like this one but
+ * that reads Avro file(s) containing records whose type is the
+ * specified Avro-generated class.
*/
- public static <T> Bound<T> withSchema(Class<T> type) {
- return new Bound<>(type).withSchema(type);
+ public Read<T> withSchema(Class<T> type) {
+ return new Read<>(name, filepattern, type, ReflectData.get().getSchema(type));
}
/**
- * Returns a {@link PTransform} that reads Avro file(s)
- * containing records of the specified schema.
+ * Returns a new {@link PTransform} that's like this one but
+ * that reads Avro file(s) containing records of the specified schema.
*/
- public static Bound<GenericRecord> withSchema(Schema schema) {
- return new Bound<>(GenericRecord.class).withSchema(schema);
+ public Read<GenericRecord> withSchema(Schema schema) {
+ return new Read<>(name, filepattern, GenericRecord.class, schema);
}
/**
- * Returns a {@link PTransform} that reads Avro file(s)
- * containing records of the specified schema in a JSON-encoded
- * string form.
+ * Returns a new {@link PTransform} that's like this one but
+ * that reads Avro file(s) containing records of the specified schema
+ * in a JSON-encoded string form.
+ *
+ * <p>Does not modify this object.
*/
- public static Bound<GenericRecord> withSchema(String schema) {
+ public Read<GenericRecord> withSchema(String schema) {
return withSchema((new Schema.Parser()).parse(schema));
}
- /**
- * A {@link PTransform} that reads from an Avro file (or multiple Avro
- * files matching a pattern) and returns a bounded {@link PCollection} containing
- * the decoding of each record.
- *
- * @param <T> the type of each of the elements of the resulting
- * PCollection
- */
- public static class Bound<T> extends PTransform<PBegin, PCollection<T>> {
- /** The filepattern to read from. */
- @Nullable
- final String filepattern;
- /** The class type of the records. */
- final Class<T> type;
- /** The schema of the input file. */
- @Nullable
- final Schema schema;
-
- Bound(Class<T> type) {
- this(null, null, type, null);
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ if (filepattern == null) {
+ throw new IllegalStateException(
+ "need to set the filepattern of an AvroIO.Read transform");
}
-
- Bound(String name, String filepattern, Class<T> type, Schema schema) {
- super(name);
- this.filepattern = filepattern;
- this.type = type;
- this.schema = schema;
+ if (schema == null) {
+ throw new IllegalStateException("need to set the schema of an AvroIO.Read transform");
}
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that reads from the file(s) with the given name or pattern.
- * (See {@link AvroIO.Read#from} for a description of
- * filepatterns.)
- *
- * <p>Does not modify this object.
- */
- public Bound<T> from(String filepattern) {
- return new Bound<>(name, filepattern, type, schema);
- }
+ @SuppressWarnings("unchecked")
+ Bounded<T> read =
+ type == GenericRecord.class
+ ? (Bounded<T>) org.apache.beam.sdk.io.Read.from(
+ AvroSource.from(filepattern).withSchema(schema))
+ : org.apache.beam.sdk.io.Read.from(
+ AvroSource.from(filepattern).withSchema(type));
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that reads Avro file(s) containing records whose type is the
- * specified Avro-generated class.
- *
- * <p>Does not modify this object.
- *
- * @param <X> the type of the decoded elements and the elements of
- * the resulting PCollection
- */
- public <X> Bound<X> withSchema(Class<X> type) {
- return new Bound<>(name, filepattern, type, ReflectData.get().getSchema(type));
- }
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that reads Avro file(s) containing records of the specified schema.
- *
- * <p>Does not modify this object.
- */
- public Bound<GenericRecord> withSchema(Schema schema) {
- return new Bound<>(name, filepattern, GenericRecord.class, schema);
- }
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that reads Avro file(s) containing records of the specified schema
- * in a JSON-encoded string form.
- *
- * <p>Does not modify this object.
- */
- public Bound<GenericRecord> withSchema(String schema) {
- return withSchema((new Schema.Parser()).parse(schema));
- }
-
- @Override
- public PCollection<T> expand(PBegin input) {
- if (filepattern == null) {
- throw new IllegalStateException(
- "need to set the filepattern of an AvroIO.Read transform");
- }
- if (schema == null) {
- throw new IllegalStateException("need to set the schema of an AvroIO.Read transform");
- }
-
- @SuppressWarnings("unchecked")
- Bounded<T> read =
- type == GenericRecord.class
- ? (Bounded<T>) org.apache.beam.sdk.io.Read.from(
- AvroSource.from(filepattern).withSchema(schema))
- : org.apache.beam.sdk.io.Read.from(
- AvroSource.from(filepattern).withSchema(type));
-
- PCollection<T> pcol = input.getPipeline().apply("Read", read);
- // Honor the default output coder that would have been used by this PTransform.
- pcol.setCoder(getDefaultOutputCoder());
- return pcol;
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("filePattern", filepattern)
- .withLabel("Input File Pattern"));
- }
+ PCollection<T> pcol = input.getPipeline().apply("Read", read);
+ // Honor the default output coder that would have been used by this PTransform.
+ pcol.setCoder(getDefaultOutputCoder());
+ return pcol;
+ }
- @Override
- protected Coder<T> getDefaultOutputCoder() {
- return AvroCoder.of(type, schema);
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(DisplayData.item("filePattern", filepattern)
+ .withLabel("Input File Pattern"));
+ }
- public String getFilepattern() {
- return filepattern;
- }
+ @Override
+ protected Coder<T> getDefaultOutputCoder() {
+ return AvroCoder.of(type, schema);
+ }
- public Schema getSchema() {
- return schema;
- }
+ public String getFilepattern() {
+ return filepattern;
}
- /** Disallow construction of utility class. */
- private Read() {}
+ public Schema getSchema() {
+ return schema;
+ }
}
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index ece7997..6d842b3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -103,7 +103,7 @@ public class AvroIOTest {
@Test
public void testAvroIOGetName() {
- assertEquals("AvroIO.Read", AvroIO.Read.from("gs://bucket/foo*/baz").getName());
+ assertEquals("AvroIO.Read", AvroIO.read().from("gs://bucket/foo*/baz").getName());
assertEquals("AvroIO.Write", AvroIO.Write.to("gs://bucket/foo/baz").getName());
}
@@ -150,8 +150,11 @@ public class AvroIOTest {
.withSchema(GenericClass.class));
p.run();
- PCollection<GenericClass> input = p
- .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClass.class));
+ PCollection<GenericClass> input =
+ p.apply(
+ AvroIO.<GenericClass>read()
+ .from(outputFile.getAbsolutePath())
+ .withSchema(GenericClass.class));
PAssert.that(input).containsInAnyOrder(values);
p.run();
@@ -173,7 +176,7 @@ public class AvroIOTest {
p.run();
PCollection<GenericClass> input = p
- .apply(AvroIO.Read
+ .apply(AvroIO.<GenericClass>read()
.from(outputFile.getAbsolutePath())
.withSchema(GenericClass.class));
@@ -200,7 +203,7 @@ public class AvroIOTest {
p.run();
PCollection<GenericClass> input = p
- .apply(AvroIO.Read
+ .apply(AvroIO.<GenericClass>read()
.from(outputFile.getAbsolutePath())
.withSchema(GenericClass.class));
@@ -269,8 +272,11 @@ public class AvroIOTest {
List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null),
new GenericClassV2(5, "bar", null));
- PCollection<GenericClassV2> input = p
- .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClassV2.class));
+ PCollection<GenericClassV2> input =
+ p.apply(
+ AvroIO.<GenericClassV2>read()
+ .from(outputFile.getAbsolutePath())
+ .withSchema(GenericClassV2.class));
PAssert.that(input).containsInAnyOrder(expected);
p.run();
@@ -533,7 +539,7 @@ public class AvroIOTest {
@Test
public void testReadDisplayData() {
- AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*");
+ AvroIO.Read<?> read = AvroIO.read().from("foo.*");
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
@@ -544,7 +550,7 @@ public class AvroIOTest {
public void testPrimitiveReadDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*")
+ AvroIO.Read<?> read = AvroIO.read().from("foo.*")
.withSchema(Schema.create(Schema.Type.STRING));
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index 3cf52a4..06b9841 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -138,13 +138,13 @@ public class AvroIOTransformTest {
}
private <T> void runTestRead(@Nullable final String applyName,
- final AvroIO.Read.Bound<T> readBuilder,
+ final AvroIO.Read<T> readBuilder,
final String expectedName,
final T[] expectedOutput) throws Exception {
final File avroFile = tmpFolder.newFile("file.avro");
generateAvroFile(generateAvroObjects(), avroFile);
- final AvroIO.Read.Bound<T> read = readBuilder.from(avroFile.getPath());
+ final AvroIO.Read<T> read = readBuilder.from(avroFile.getPath());
final PCollection<T> output =
applyName == null ? pipeline.apply(read) : pipeline.apply(applyName, read);
@@ -169,14 +169,14 @@ public class AvroIOTransformTest {
// test read using generated class
new Object[] {
null,
- AvroIO.Read.withSchema(AvroGeneratedUser.class),
+ AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class),
"AvroIO.Read/Read.out",
generateAvroObjects(),
generatedClass
},
new Object[] {
"MyRead",
- AvroIO.Read.withSchema(AvroGeneratedUser.class),
+ AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class),
"MyRead/Read.out",
generateAvroObjects(),
generatedClass
@@ -185,14 +185,14 @@ public class AvroIOTransformTest {
// test read using schema object
new Object[] {
null,
- AvroIO.Read.withSchema(SCHEMA),
+ AvroIO.read().withSchema(SCHEMA),
"AvroIO.Read/Read.out",
generateAvroGenericRecords(),
fromSchema
},
new Object[] {
"MyRead",
- AvroIO.Read.withSchema(SCHEMA),
+ AvroIO.read().withSchema(SCHEMA),
"MyRead/Read.out",
generateAvroGenericRecords(),
fromSchema
@@ -201,14 +201,14 @@ public class AvroIOTransformTest {
// test read using schema string
new Object[] {
null,
- AvroIO.Read.withSchema(SCHEMA_STRING),
+ AvroIO.read().withSchema(SCHEMA_STRING),
"AvroIO.Read/Read.out",
generateAvroGenericRecords(),
fromSchemaString
},
new Object[] {
"MyRead",
- AvroIO.Read.withSchema(SCHEMA_STRING),
+ AvroIO.read().withSchema(SCHEMA_STRING),
"MyRead/Read.out",
generateAvroGenericRecords(),
fromSchemaString
@@ -221,7 +221,7 @@ public class AvroIOTransformTest {
public String transformName;
@Parameterized.Parameter(1)
- public AvroIO.Read.Bound readTransform;
+ public AvroIO.Read readTransform;
@Parameterized.Parameter(2)
public String expectedReadTransformName;
[10/11] beam git commit: Moves AvroIO.Read.withSchema into read()
Posted by jk...@apache.org.
Moves AvroIO.Read.withSchema into read()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/abb4916c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/abb4916c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/abb4916c
Branch: refs/heads/master
Commit: abb4916ce2fa8d4a5caf783b66cc5541053ea83c
Parents: d1dfd4e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 19:03:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 35 ++++++++------------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 24 ++++++--------
.../apache/beam/sdk/io/AvroIOTransformTest.java | 4 +--
3 files changed, 25 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/abb4916c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 4bde6ec..08fc8a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -46,16 +46,15 @@ import org.apache.beam.sdk.values.PDone;
* {@link PTransform}s for reading and writing Avro files.
*
* <p>To read a {@link PCollection} from one or more Avro files, use
- * {@link AvroIO.Read}, specifying {@link AvroIO.Read#from} to specify
+ * {@code AvroIO.read()}, specifying {@link AvroIO.Read#from} to specify
* the path of the file(s) to read from (e.g., a local filename or
* filename pattern if running locally, or a Google Cloud Storage
* filename or filename pattern of the form {@code "gs://<bucket>/<filepath>"}).
*
- * <p>It is required to specify {@link AvroIO.Read#withSchema}. To
- * read specific records, such as Avro-generated classes, provide an
- * Avro-generated class type. To read {@link GenericRecord GenericRecords}, provide either
- * a {@link Schema} object or an Avro schema in a JSON-encoded string form.
- * An exception will be thrown if a record doesn't match the specified
+ * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}.
+ * To read {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes
+ * a {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
* schema.
*
* <p>For example:
@@ -64,15 +63,13 @@ import org.apache.beam.sdk.values.PDone;
*
* // A simple Read of a local file (only runs locally):
* PCollection<AvroAutoGenClass> records =
- * p.apply(AvroIO.read().from("/path/to/file.avro")
- * .withSchema(AvroAutoGenClass.class));
+ * p.apply(AvroIO.read(AvroAutoGenClass.class).from("/path/to/file.avro"));
*
* // A Read from a GCS file (runs locally and using remote execution):
* Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
* PCollection<GenericRecord> records =
- * p.apply(AvroIO.read()
- * .from("gs://my_bucket/path/to/records-*.avro")
- * .withSchema(schema));
+ * p.apply(AvroIO.readGenericRecords(schema)
+ * .from("gs://my_bucket/path/to/records-*.avro"));
* } </pre>
*
* <p>To write a {@link PCollection} to one or more Avro files, use
@@ -130,8 +127,11 @@ public class AvroIO {
*
* <p>The schema must be specified using one of the {@code withSchema} functions.
*/
- public static <T> Read<T> read() {
- return new AutoValue_AvroIO_Read.Builder<T>().build();
+ public static <T> Read<T> read(Class<T> recordClass) {
+ return new AutoValue_AvroIO_Read.Builder<T>()
+ .setRecordClass(recordClass)
+ .setSchema(ReflectData.get().getSchema(recordClass))
+ .build();
}
/** Reads Avro file(s) containing records of the specified schema. */
@@ -188,15 +188,6 @@ public class AvroIO {
return toBuilder().setFilepattern(filepattern).build();
}
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that reads Avro file(s) containing records whose type is the
- * specified Avro-generated class.
- */
- public Read<T> withSchema(Class<T> type) {
- return toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build();
- }
-
@Override
public PCollection<T> expand(PBegin input) {
if (getFilepattern() == null) {
http://git-wip-us.apache.org/repos/asf/beam/blob/abb4916c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 7df1b18..38984b5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -102,7 +102,7 @@ public class AvroIOTest {
@Test
public void testAvroIOGetName() {
- assertEquals("AvroIO.Read", AvroIO.read().from("gs://bucket/foo*/baz").getName());
+ assertEquals("AvroIO.Read", AvroIO.read(String.class).from("gs://bucket/foo*/baz").getName());
assertEquals("AvroIO.Write", AvroIO.write().to("gs://bucket/foo/baz").getName());
}
@@ -151,9 +151,8 @@ public class AvroIOTest {
PCollection<GenericClass> input =
p.apply(
- AvroIO.<GenericClass>read()
- .from(outputFile.getAbsolutePath())
- .withSchema(GenericClass.class));
+ AvroIO.read(GenericClass.class)
+ .from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(values);
p.run();
@@ -175,9 +174,8 @@ public class AvroIOTest {
p.run();
PCollection<GenericClass> input = p
- .apply(AvroIO.<GenericClass>read()
- .from(outputFile.getAbsolutePath())
- .withSchema(GenericClass.class));
+ .apply(AvroIO.read(GenericClass.class)
+ .from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(values);
p.run();
@@ -202,9 +200,8 @@ public class AvroIOTest {
p.run();
PCollection<GenericClass> input = p
- .apply(AvroIO.<GenericClass>read()
- .from(outputFile.getAbsolutePath())
- .withSchema(GenericClass.class));
+ .apply(AvroIO.read(GenericClass.class)
+ .from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(values);
p.run();
@@ -273,9 +270,8 @@ public class AvroIOTest {
PCollection<GenericClassV2> input =
p.apply(
- AvroIO.<GenericClassV2>read()
- .from(outputFile.getAbsolutePath())
- .withSchema(GenericClassV2.class));
+ AvroIO.read(GenericClassV2.class)
+ .from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(expected);
p.run();
@@ -535,7 +531,7 @@ public class AvroIOTest {
@Test
public void testReadDisplayData() {
- AvroIO.Read<?> read = AvroIO.read().from("foo.*");
+ AvroIO.Read<?> read = AvroIO.read(String.class).from("foo.*");
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
http://git-wip-us.apache.org/repos/asf/beam/blob/abb4916c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index ba7f1b9..51c9691 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -169,14 +169,14 @@ public class AvroIOTransformTest {
// test read using generated class
new Object[] {
null,
- AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class),
+ AvroIO.read(AvroGeneratedUser.class),
"AvroIO.Read/Read.out",
generateAvroObjects(),
generatedClass
},
new Object[] {
"MyRead",
- AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class),
+ AvroIO.read(AvroGeneratedUser.class),
"MyRead/Read.out",
generateAvroObjects(),
generatedClass
[02/11] beam git commit: Moves AvroIO.write().withSchema into write()
Posted by jk...@apache.org.
Moves AvroIO.write().withSchema into write()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/27d74622
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/27d74622
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/27d74622
Branch: refs/heads/master
Commit: 27d74622e877d017aa70feef0ee4cd26a4bece7a
Parents: e0d7475
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 19:25:45 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 49 +++++++++-----------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 42 +++++++----------
.../apache/beam/sdk/io/AvroIOTransformTest.java | 2 +-
3 files changed, 40 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/27d74622/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 8cdd4e7..6b66a98 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -89,26 +89,23 @@ import org.apache.beam.sdk.values.PDone;
* {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and triggers must produce
* unique filenames.
*
- * <p>It is required to specify {@link AvroIO.Write#withSchema}. To
- * write specific records, such as Avro-generated classes, provide an
- * Avro-generated class type. To write {@link GenericRecord GenericRecords}, provide either
- * a {@link Schema} object or a schema in a JSON-encoded string form.
- * An exception will be thrown if a record doesn't match the specified
- * schema.
+ * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}.
+ * To write {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)}
+ * which takes a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema
+ * in a JSON-encoded string form. An exception will be thrown if a record doesn't match the
+ * specified schema.
*
* <p>For example:
* <pre> {@code
* // A simple Write to a local file (only runs locally):
* PCollection<AvroAutoGenClass> records = ...;
- * records.apply(AvroIO.write().to("/path/to/file.avro")
- * .withSchema(AvroAutoGenClass.class));
+ * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
*
* // A Write to a sharded GCS file (runs locally and using remote execution):
* Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
* PCollection<GenericRecord> records = ...;
- * records.apply("WriteToAvro", AvroIO.write()
+ * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
* .to("gs://my_bucket/path/to/numbers")
- * .withSchema(schema)
* .withSuffix(".avro"));
* } </pre>
*
@@ -153,26 +150,31 @@ public class AvroIO {
* Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
* pattern).
*/
- public static <T> Write<T> write() {
- return new AutoValue_AvroIO_Write.Builder<T>()
- .setFilenameSuffix("")
- .setNumShards(0)
- .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
- .setCodec(Write.DEFAULT_CODEC)
- .setMetadata(ImmutableMap.<String, Object>of())
- .setWindowedWrites(false)
+ public static <T> Write<T> write(Class<T> recordClass) {
+ return AvroIO.<T>defaultWriteBuilder()
+ .setRecordClass(recordClass)
+ .setSchema(ReflectData.get().getSchema(recordClass))
.build();
}
/** Writes Avro records of the specified schema. */
public static Write<GenericRecord> writeGenericRecords(Schema schema) {
- return AvroIO.<GenericRecord>write()
- .toBuilder()
+ return AvroIO.<GenericRecord>defaultWriteBuilder()
.setRecordClass(GenericRecord.class)
.setSchema(schema)
.build();
}
+ private static <T> Write.Builder<T> defaultWriteBuilder() {
+ return new AutoValue_AvroIO_Write.Builder<T>()
+ .setFilenameSuffix("")
+ .setNumShards(0)
+ .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
+ .setCodec(Write.DEFAULT_CODEC)
+ .setMetadata(ImmutableMap.<String, Object>of())
+ .setWindowedWrites(false);
+ }
+
/**
* Like {@link #writeGenericRecords(Schema)} but the schema is specified as a JSON-encoded string.
*/
@@ -369,13 +371,6 @@ public class AvroIO {
return toBuilder().setWindowedWrites(true).build();
}
- /**
- * Writes to Avro file(s) containing records whose type is the specified Avro-generated class.
- */
- public Write<T> withSchema(Class<T> type) {
- return toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build();
- }
-
/** Writes to Avro file(s) compressed using specified codec. */
public Write<T> withCodec(CodecFactory codec) {
return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build();
http://git-wip-us.apache.org/repos/asf/beam/blob/27d74622/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 4abd3e0..e421b96 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -45,7 +45,6 @@ import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
@@ -103,7 +102,7 @@ public class AvroIOTest {
@Test
public void testAvroIOGetName() {
assertEquals("AvroIO.Read", AvroIO.read(String.class).from("gs://bucket/foo*/baz").getName());
- assertEquals("AvroIO.Write", AvroIO.write().to("gs://bucket/foo/baz").getName());
+ assertEquals("AvroIO.Write", AvroIO.write(String.class).to("gs://bucket/foo/baz").getName());
}
@DefaultCoder(AvroCoder.class)
@@ -144,9 +143,8 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
- .withoutSharding()
- .withSchema(GenericClass.class));
+ .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
+ .withoutSharding());
p.run();
PCollection<GenericClass> input =
@@ -167,10 +165,9 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
+ .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
.withoutSharding()
- .withCodec(CodecFactory.deflateCodec(9))
- .withSchema(GenericClass.class));
+ .withCodec(CodecFactory.deflateCodec(9)));
p.run();
PCollection<GenericClass> input = p
@@ -193,9 +190,8 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
+ .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
.withoutSharding()
- .withSchema(GenericClass.class)
.withCodec(CodecFactory.nullCodec()));
p.run();
@@ -260,9 +256,8 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
- .withoutSharding()
- .withSchema(GenericClass.class));
+ .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
+ .withoutSharding());
p.run();
List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null),
@@ -367,10 +362,9 @@ public class AvroIOTest {
windowedAvroWritePipeline
.apply(values)
.apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
- .apply(AvroIO.<GenericClass>write().to(new WindowedFilenamePolicy(outputFilePrefix))
+ .apply(AvroIO.write(GenericClass.class).to(new WindowedFilenamePolicy(outputFilePrefix))
.withWindowedWrites()
- .withNumShards(2)
- .withSchema(GenericClass.class));
+ .withNumShards(2));
windowedAvroWritePipeline.run();
// Validate that the data written matches the expected elements in the expected order
@@ -402,14 +396,14 @@ public class AvroIOTest {
@Test
public void testWriteWithDefaultCodec() throws Exception {
- AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
+ AvroIO.Write<String> write = AvroIO.write(String.class)
.to("gs://bucket/foo/baz");
assertEquals(CodecFactory.deflateCodec(6).toString(), write.getCodec().toString());
}
@Test
public void testWriteWithCustomCodec() throws Exception {
- AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
+ AvroIO.Write<?> write = AvroIO.write(String.class)
.to("gs://bucket/foo/baz")
.withCodec(CodecFactory.snappyCodec());
assertEquals(SNAPPY_CODEC, write.getCodec().toString());
@@ -418,7 +412,7 @@ public class AvroIOTest {
@Test
@SuppressWarnings("unchecked")
public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
- AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
+ AvroIO.Write<String> write = AvroIO.write(String.class)
.to("gs://bucket/foo/baz")
.withCodec(CodecFactory.deflateCodec(9));
@@ -430,7 +424,7 @@ public class AvroIOTest {
@Test
@SuppressWarnings("unchecked")
public void testWriteWithSerDeCustomXZCodec() throws Exception {
- AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
+ AvroIO.Write<String> write = AvroIO.write(String.class)
.to("gs://bucket/foo/baz")
.withCodec(CodecFactory.xzCodec(9));
@@ -448,9 +442,8 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
+ .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
.withoutSharding()
- .withSchema(GenericClass.class)
.withMetadata(ImmutableMap.<String, Object>of(
"stringKey", "stringValue",
"longKey", 100L,
@@ -471,7 +464,7 @@ public class AvroIOTest {
String outputFilePrefix = baseOutputFile.getAbsolutePath();
AvroIO.Write<String> write =
- AvroIO.<String>write().to(outputFilePrefix).withSchema(String.class);
+ AvroIO.write(String.class).to(outputFilePrefix);
if (numShards > 1) {
System.out.println("NumShards " + numShards);
write = write.withNumShards(numShards);
@@ -552,11 +545,10 @@ public class AvroIOTest {
@Test
public void testWriteDisplayData() {
- AvroIO.Write<?> write = AvroIO.<GenericClass>write()
+ AvroIO.Write<?> write = AvroIO.write(GenericClass.class)
.to("foo")
.withShardNameTemplate("-SS-of-NN-")
.withSuffix("bar")
- .withSchema(GenericClass.class)
.withNumShards(100)
.withCodec(CodecFactory.snappyCodec());
http://git-wip-us.apache.org/repos/asf/beam/blob/27d74622/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index fb57d5c..b4f7a79 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -271,7 +271,7 @@ public class AvroIOTransformTest {
ImmutableList.<Object[]>builder()
.add(
new Object[] {
- AvroIO.<AvroGeneratedUser>write().withSchema(AvroGeneratedUser.class),
+ AvroIO.write(AvroGeneratedUser.class),
generatedClass
},
new Object[] {
[11/11] beam git commit: This closes #2778
Posted by jk...@apache.org.
This closes #2778
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/034565c6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/034565c6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/034565c6
Branch: refs/heads/master
Commit: 034565c6811833fa1143362fb44f94672cef1e30
Parents: 6d443bc caf2fae
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon May 1 18:43:45 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:45 2017 -0700
----------------------------------------------------------------------
.../beam/runners/spark/io/AvroPipelineTest.java | 4 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 1193 +++++-------------
.../java/org/apache/beam/sdk/io/AvroSink.java | 142 +++
.../java/org/apache/beam/sdk/io/AvroSource.java | 4 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 4 +-
.../beam/sdk/testing/SourceTestUtils.java | 5 +-
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 108 +-
.../apache/beam/sdk/io/AvroIOTransformTest.java | 30 +-
8 files changed, 521 insertions(+), 969 deletions(-)
----------------------------------------------------------------------
[08/11] beam git commit: Converts AvroIO.Write to AutoValue;
adds writeGenericRecords()
Posted by jk...@apache.org.
Converts AvroIO.Write to AutoValue; adds writeGenericRecords()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0d74750
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0d74750
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0d74750
Branch: refs/heads/master
Commit: e0d74750da73658a067e7522f18c23c5e622fb2f
Parents: abb4916
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 19:21:15 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700
----------------------------------------------------------------------
.../beam/runners/spark/io/AvroPipelineTest.java | 2 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 355 +++++--------------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 19 +-
.../apache/beam/sdk/io/AvroIOTransformTest.java | 4 +-
4 files changed, 105 insertions(+), 275 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index c58d81e..7188dc5 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -75,7 +75,7 @@ public class AvroPipelineTest {
Pipeline p = pipelineRule.createPipeline();
PCollection<GenericRecord> input = p.apply(
AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
- input.apply(AvroIO.write().to(outputDir.getAbsolutePath()).withSchema(schema));
+ input.apply(AvroIO.writeGenericRecords(schema).to(outputDir.getAbsolutePath()));
p.run().waitUntilFinish();
List<GenericRecord> records = readGenericFile();
http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 08fc8a9..8cdd4e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -154,7 +154,30 @@ public class AvroIO {
* pattern).
*/
public static <T> Write<T> write() {
- return new Write<>(null);
+ return new AutoValue_AvroIO_Write.Builder<T>()
+ .setFilenameSuffix("")
+ .setNumShards(0)
+ .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
+ .setCodec(Write.DEFAULT_CODEC)
+ .setMetadata(ImmutableMap.<String, Object>of())
+ .setWindowedWrites(false)
+ .build();
+ }
+
+ /** Writes Avro records of the specified schema. */
+ public static Write<GenericRecord> writeGenericRecords(Schema schema) {
+ return AvroIO.<GenericRecord>write()
+ .toBuilder()
+ .setRecordClass(GenericRecord.class)
+ .setSchema(schema)
+ .build();
+ }
+
+ /**
+ * Like {@link #writeGenericRecords(Schema)} but the schema is specified as a JSON-encoded string.
+ */
+ public static Write<GenericRecord> writeGenericRecords(String schema) {
+ return writeGenericRecords(new Schema.Parser().parse(schema));
}
/** Implementation of {@link #read}. */
@@ -229,7 +252,8 @@ public class AvroIO {
/////////////////////////////////////////////////////////////////////////////
/** Implementation of {@link #write}. */
- public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+ @AutoValue
+ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
/**
* A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or
* multiple Avro files matching a sharding pattern).
@@ -242,80 +266,38 @@ public class AvroIO {
// This should be a multiple of 4 to not get a partial encoded byte.
private static final int METADATA_BYTES_MAX_LENGTH = 40;
- /** The filename to write to. */
- @Nullable
- final String filenamePrefix;
- /** Suffix to use for each filename. */
- final String filenameSuffix;
- /** Requested number of shards. 0 for automatic. */
- final int numShards;
- /** Shard template string. */
- final String shardTemplate;
- /** The class type of the records. */
- final Class<T> type;
- /** The schema of the output file. */
- @Nullable
- final Schema schema;
- final boolean windowedWrites;
- FileBasedSink.FilenamePolicy filenamePolicy;
-
+ @Nullable abstract String getFilenamePrefix();
+ abstract String getFilenameSuffix();
+ abstract int getNumShards();
+ abstract String getShardTemplate();
+ abstract Class<T> getRecordClass();
+ @Nullable abstract Schema getSchema();
+ abstract boolean getWindowedWrites();
+ @Nullable abstract FileBasedSink.FilenamePolicy getFilenamePolicy();
/**
* The codec used to encode the blocks in the Avro file. String value drawn from those in
* https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
*/
- final SerializableAvroCodecFactory codec;
+ abstract SerializableAvroCodecFactory getCodec();
/** Avro file metadata. */
- final ImmutableMap<String, Object> metadata;
-
- Write(Class<T> type) {
- this(
- null,
- null,
- "",
- 0,
- DEFAULT_SHARD_TEMPLATE,
- type,
- null,
- DEFAULT_CODEC,
- ImmutableMap.<String, Object>of(),
- false,
- null);
- }
+ abstract ImmutableMap<String, Object> getMetadata();
- Write(
- String name,
- String filenamePrefix,
- String filenameSuffix,
- int numShards,
- String shardTemplate,
- Class<T> type,
- Schema schema,
- SerializableAvroCodecFactory codec,
- Map<String, Object> metadata,
- boolean windowedWrites,
- FileBasedSink.FilenamePolicy filenamePolicy) {
- super(name);
- this.filenamePrefix = filenamePrefix;
- this.filenameSuffix = filenameSuffix;
- this.numShards = numShards;
- this.shardTemplate = shardTemplate;
- this.type = type;
- this.schema = schema;
- this.codec = codec;
- this.windowedWrites = windowedWrites;
- this.filenamePolicy = filenamePolicy;
+ abstract Builder<T> toBuilder();
- Map<String, String> badKeys = Maps.newLinkedHashMap();
- for (Map.Entry<String, Object> entry : metadata.entrySet()) {
- Object v = entry.getValue();
- if (!(v instanceof String || v instanceof Long || v instanceof byte[])) {
- badKeys.put(entry.getKey(), v.getClass().getSimpleName());
- }
- }
- checkArgument(
- badKeys.isEmpty(),
- "Metadata value type must be one of String, Long, or byte[]. Found {}", badKeys);
- this.metadata = ImmutableMap.copyOf(metadata);
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setFilenamePrefix(String filenamePrefix);
+ abstract Builder<T> setFilenameSuffix(String filenameSuffix);
+ abstract Builder<T> setNumShards(int numShards);
+ abstract Builder<T> setShardTemplate(String shardTemplate);
+ abstract Builder<T> setRecordClass(Class<T> recordClass);
+ abstract Builder<T> setSchema(Schema schema);
+ abstract Builder<T> setWindowedWrites(boolean windowedWrites);
+ abstract Builder<T> setFilenamePolicy(FileBasedSink.FilenamePolicy filenamePolicy);
+ abstract Builder<T> setCodec(SerializableAvroCodecFactory codec);
+ abstract Builder<T> setMetadata(ImmutableMap<String, Object> metadata);
+
+ abstract Write<T> build();
}
/**
@@ -330,34 +312,12 @@ public class AvroIO {
*/
public Write<T> to(String filenamePrefix) {
validateOutputComponent(filenamePrefix);
- return new Write<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
+ return toBuilder().setFilenamePrefix(filenamePrefix).build();
}
/** Writes to the file(s) specified by the provided {@link FileBasedSink.FilenamePolicy}. */
public Write<T> to(FileBasedSink.FilenamePolicy filenamePolicy) {
- return new Write<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
+ return toBuilder().setFilenamePolicy(filenamePolicy).build();
}
/**
@@ -367,18 +327,7 @@ public class AvroIO {
*/
public Write<T> withSuffix(String filenameSuffix) {
validateOutputComponent(filenameSuffix);
- return new Write<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
+ return toBuilder().setFilenameSuffix(filenameSuffix).build();
}
/**
@@ -394,18 +343,7 @@ public class AvroIO {
*/
public Write<T> withNumShards(int numShards) {
checkArgument(numShards >= 0);
- return new Write<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
+ return toBuilder().setNumShards(numShards).build();
}
/**
@@ -415,18 +353,7 @@ public class AvroIO {
* @see ShardNameTemplate
*/
public Write<T> withShardNameTemplate(String shardTemplate) {
- return new Write<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
+ return toBuilder().setShardTemplate(shardTemplate).build();
}
/**
@@ -439,76 +366,19 @@ public class AvroIO {
}
public Write<T> withWindowedWrites() {
- return new Write<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- true,
- filenamePolicy);
+ return toBuilder().setWindowedWrites(true).build();
}
/**
* Writes to Avro file(s) containing records whose type is the specified Avro-generated class.
*/
public Write<T> withSchema(Class<T> type) {
- return new Write<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- ReflectData.get().getSchema(type),
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
- }
-
- /** Writes to Avro file(s) containing records of the specified schema. */
- public Write<GenericRecord> withSchema(Schema schema) {
- return new Write<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- GenericRecord.class,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
- }
-
- /**
- * Writes to Avro file(s) containing records of the specified schema in a JSON-encoded string
- * form.
- */
- public Write<GenericRecord> withSchema(String schema) {
- return withSchema((new Schema.Parser()).parse(schema));
+ return toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build();
}
/** Writes to Avro file(s) compressed using specified codec. */
public Write<T> withCodec(CodecFactory codec) {
- return new Write<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- new SerializableAvroCodecFactory(codec),
- metadata,
- windowedWrites,
- filenamePolicy);
+ return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build();
}
/**
@@ -517,56 +387,56 @@ public class AvroIO {
* <p>Supported value types are String, Long, and byte[].
*/
public Write<T> withMetadata(Map<String, Object> metadata) {
- return new Write<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
+ Map<String, String> badKeys = Maps.newLinkedHashMap();
+ for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+ Object v = entry.getValue();
+ if (!(v instanceof String || v instanceof Long || v instanceof byte[])) {
+ badKeys.put(entry.getKey(), v.getClass().getSimpleName());
+ }
+ }
+ checkArgument(
+ badKeys.isEmpty(),
+ "Metadata value type must be one of String, Long, or byte[]. Found {}",
+ badKeys);
+ return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build();
}
@Override
public PDone expand(PCollection<T> input) {
- if (filenamePolicy == null && filenamePrefix == null) {
+ if (getFilenamePolicy() == null && getFilenamePrefix() == null) {
throw new IllegalStateException(
"need to set the filename prefix of an AvroIO.Write transform");
}
- if (filenamePolicy != null && filenamePrefix != null) {
+ if (getFilenamePolicy() != null && getFilenamePrefix() != null) {
throw new IllegalStateException(
"cannot set both a filename policy and a filename prefix");
}
- if (schema == null) {
+ if (getSchema() == null) {
throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
}
WriteFiles<T> write = null;
- if (filenamePolicy != null) {
+ if (getFilenamePolicy() != null) {
write = WriteFiles.to(
new AvroSink<>(
- filenamePolicy,
- AvroCoder.of(type, schema),
- codec,
- metadata));
+ getFilenamePolicy(),
+ AvroCoder.of(getRecordClass(), getSchema()),
+ getCodec(),
+ getMetadata()));
} else {
write = WriteFiles.to(
new AvroSink<>(
- filenamePrefix,
- filenameSuffix,
- shardTemplate,
- AvroCoder.of(type, schema),
- codec,
- metadata));
+ getFilenamePrefix(),
+ getFilenameSuffix(),
+ getShardTemplate(),
+ AvroCoder.of(getRecordClass(), getSchema()),
+ getCodec(),
+ getMetadata()));
}
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
- if (windowedWrites) {
+ if (getWindowedWrites()) {
write = write.withWindowedWrites();
}
return input.apply("Write", write);
@@ -576,20 +446,20 @@ public class AvroIO {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
- .add(DisplayData.item("schema", type)
+ .add(DisplayData.item("schema", getRecordClass())
.withLabel("Record Schema"))
- .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)
+ .addIfNotNull(DisplayData.item("filePrefix", getFilenamePrefix())
.withLabel("Output File Prefix"))
- .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+ .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
.withLabel("Output Shard Name Template"),
DEFAULT_SHARD_TEMPLATE)
- .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+ .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
.withLabel("Output File Suffix"),
"")
- .addIfNotDefault(DisplayData.item("numShards", numShards)
+ .addIfNotDefault(DisplayData.item("numShards", getNumShards())
.withLabel("Maximum Output Shards"),
0)
- .addIfNotDefault(DisplayData.item("codec", codec.toString())
+ .addIfNotDefault(DisplayData.item("codec", getCodec().toString())
.withLabel("Avro Compression Codec"),
DEFAULT_CODEC.toString());
builder.include("Metadata", new Metadata());
@@ -598,7 +468,7 @@ public class AvroIO {
private class Metadata implements HasDisplayData {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+ for (Map.Entry<String, Object> entry : getMetadata().entrySet()) {
DisplayData.Type type = DisplayData.inferType(entry.getValue());
if (type != null) {
builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
@@ -612,49 +482,10 @@ public class AvroIO {
}
}
- /**
- * Returns the current shard name template string.
- */
- public String getShardNameTemplate() {
- return shardTemplate;
- }
-
@Override
protected Coder<Void> getDefaultOutputCoder() {
return VoidCoder.of();
}
-
- public String getFilenamePrefix() {
- return filenamePrefix;
- }
-
- public String getShardTemplate() {
- return shardTemplate;
- }
-
- public int getNumShards() {
- return numShards;
- }
-
- public String getFilenameSuffix() {
- return filenameSuffix;
- }
-
- public Class<T> getType() {
- return type;
- }
-
- public Schema getSchema() {
- return schema;
- }
-
- public CodecFactory getCodec() {
- return codec.getCodec();
- }
-
- public Map<String, Object> getMetadata() {
- return metadata;
- }
}
// Pattern which matches old-style shard output patterns, which are now
http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 38984b5..4abd3e0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -422,9 +422,9 @@ public class AvroIOTest {
.to("gs://bucket/foo/baz")
.withCodec(CodecFactory.deflateCodec(9));
- AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write);
-
- assertEquals(CodecFactory.deflateCodec(9).toString(), serdeWrite.getCodec().toString());
+ assertEquals(
+ CodecFactory.deflateCodec(9).toString(),
+ SerializableUtils.clone(write.getCodec()).getCodec().toString());
}
@Test
@@ -434,9 +434,9 @@ public class AvroIOTest {
.to("gs://bucket/foo/baz")
.withCodec(CodecFactory.xzCodec(9));
- AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write);
-
- assertEquals(CodecFactory.xzCodec(9).toString(), serdeWrite.getCodec().toString());
+ assertEquals(
+ CodecFactory.xzCodec(9).toString(),
+ SerializableUtils.clone(write.getCodec()).getCodec().toString());
}
@Test
@@ -482,7 +482,7 @@ public class AvroIOTest {
p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);
p.run();
- String shardNameTemplate = write.getShardNameTemplate();
+ String shardNameTemplate = write.getShardTemplate();
assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate);
}
@@ -580,9 +580,8 @@ public class AvroIOTest {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
- AvroIO.Write<?> write = AvroIO.<GenericRecord>write()
- .to(outputPath)
- .withSchema(Schema.create(Schema.Type.STRING));
+ AvroIO.Write<?> write = AvroIO.writeGenericRecords(Schema.create(Schema.Type.STRING))
+ .to(outputPath);
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat("AvroIO.Write should include the file pattern in its primitive transform",
http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index 51c9691..fb57d5c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -275,12 +275,12 @@ public class AvroIOTransformTest {
generatedClass
},
new Object[] {
- AvroIO.write().withSchema(SCHEMA),
+ AvroIO.writeGenericRecords(SCHEMA),
fromSchema
},
new Object[] {
- AvroIO.write().withSchema(SCHEMA_STRING),
+ AvroIO.writeGenericRecords(SCHEMA_STRING),
fromSchemaString
})
.build();
[07/11] beam git commit: Removes AvroIO.Write.Bound
Posted by jk...@apache.org.
Removes AvroIO.Write.Bound
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1dfd4e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1dfd4e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1dfd4e2
Branch: refs/heads/master
Commit: d1dfd4e2a8b82451f28f1f0e6f261eae0d51bb5b
Parents: 439f2ca
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 18:59:03 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700
----------------------------------------------------------------------
.../beam/runners/spark/io/AvroPipelineTest.java | 2 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 910 ++++++++-----------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 37 +-
.../apache/beam/sdk/io/AvroIOTransformTest.java | 12 +-
4 files changed, 385 insertions(+), 576 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 62db14f..c58d81e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -75,7 +75,7 @@ public class AvroPipelineTest {
Pipeline p = pipelineRule.createPipeline();
PCollection<GenericRecord> input = p.apply(
AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
- input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
+ input.apply(AvroIO.write().to(outputDir.getAbsolutePath()).withSchema(schema));
p.run().waitUntilFinish();
List<GenericRecord> records = readGenericFile();
http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 2f1d917..4bde6ec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -70,24 +70,24 @@ import org.apache.beam.sdk.values.PDone;
* // A Read from a GCS file (runs locally and using remote execution):
* Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
* PCollection<GenericRecord> records =
- * p.apply(AvroIO.Read
+ * p.apply(AvroIO.read()
* .from("gs://my_bucket/path/to/records-*.avro")
* .withSchema(schema));
* } </pre>
*
* <p>To write a {@link PCollection} to one or more Avro files, use
- * {@link AvroIO.Write}, specifying {@link AvroIO.Write#to(String)} to specify
+ * {@link AvroIO.Write}, specifying {@code AvroIO.write().to(String)} to specify
* the path of the file to write to (e.g., a local filename or sharded
* filename pattern if running locally, or a Google Cloud Storage
* filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). {@link AvroIO.Write#to(FileBasedSink.FilenamePolicy)}
+ * {@code "gs://<bucket>/<filepath>"}). {@code AvroIO.write().to(FileBasedSink.FilenamePolicy)}
* can also be used to specify a custom file naming policy.
*
* <p>By default, all input is put into the global window before writing. If per-window writes are
* desired - for example, when using a streaming runner -
- * {@link AvroIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be
+ * {@link AvroIO.Write#withWindowedWrites()} will cause windowing and triggering to be
* preserved. When producing windowed writes, the number of output shards must be set explicitly
- * using {@link AvroIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a
+ * using {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a
* runner-chosen value, so you may need not set it yourself. A
* {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and triggers must produce
* unique filenames.
@@ -103,13 +103,13 @@ import org.apache.beam.sdk.values.PDone;
* <pre> {@code
* // A simple Write to a local file (only runs locally):
* PCollection<AvroAutoGenClass> records = ...;
- * records.apply(AvroIO.Write.to("/path/to/file.avro")
+ * records.apply(AvroIO.write().to("/path/to/file.avro")
* .withSchema(AvroAutoGenClass.class));
*
* // A Write to a sharded GCS file (runs locally and using remote execution):
* Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
* PCollection<GenericRecord> records = ...;
- * records.apply("WriteToAvro", AvroIO.Write
+ * records.apply("WriteToAvro", AvroIO.write()
* .to("gs://my_bucket/path/to/numbers")
* .withSchema(schema)
* .withSuffix(".avro"));
@@ -149,6 +149,14 @@ public class AvroIO {
return readGenericRecords(new Schema.Parser().parse(schema));
}
+ /**
+ * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
+ * pattern).
+ */
+ public static <T> Write<T> write() {
+ return new Write<>(null);
+ }
+
/** Implementation of {@link #read}. */
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@@ -229,45 +237,161 @@ public class AvroIO {
/////////////////////////////////////////////////////////////////////////////
- /**
- * A root {@link PTransform} that writes a {@link PCollection} to an Avro file (or
- * multiple Avro files matching a sharding pattern).
- */
- public static class Write {
+ /** Implementation of {@link #write}. */
+ public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+ /**
+ * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or
+ * multiple Avro files matching a sharding pattern).
+ *
+ * @param <T> the type of each of the elements of the input PCollection
+ */
+ private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+ private static final SerializableAvroCodecFactory DEFAULT_CODEC =
+ new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
+ // This should be a multiple of 4 to not get a partial encoded byte.
+ private static final int METADATA_BYTES_MAX_LENGTH = 40;
+
+ /** The filename to write to. */
+ @Nullable
+ final String filenamePrefix;
+ /** Suffix to use for each filename. */
+ final String filenameSuffix;
+ /** Requested number of shards. 0 for automatic. */
+ final int numShards;
+ /** Shard template string. */
+ final String shardTemplate;
+ /** The class type of the records. */
+ final Class<T> type;
+ /** The schema of the output file. */
+ @Nullable
+ final Schema schema;
+ final boolean windowedWrites;
+ FileBasedSink.FilenamePolicy filenamePolicy;
/**
- * Returns a {@link PTransform} that writes to the file(s)
- * with the given prefix. This can be a local filename
+ * The codec used to encode the blocks in the Avro file. String value drawn from those in
+ * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
+ */
+ final SerializableAvroCodecFactory codec;
+ /** Avro file metadata. */
+ final ImmutableMap<String, Object> metadata;
+
+ Write(Class<T> type) {
+ this(
+ null,
+ null,
+ "",
+ 0,
+ DEFAULT_SHARD_TEMPLATE,
+ type,
+ null,
+ DEFAULT_CODEC,
+ ImmutableMap.<String, Object>of(),
+ false,
+ null);
+ }
+
+ Write(
+ String name,
+ String filenamePrefix,
+ String filenameSuffix,
+ int numShards,
+ String shardTemplate,
+ Class<T> type,
+ Schema schema,
+ SerializableAvroCodecFactory codec,
+ Map<String, Object> metadata,
+ boolean windowedWrites,
+ FileBasedSink.FilenamePolicy filenamePolicy) {
+ super(name);
+ this.filenamePrefix = filenamePrefix;
+ this.filenameSuffix = filenameSuffix;
+ this.numShards = numShards;
+ this.shardTemplate = shardTemplate;
+ this.type = type;
+ this.schema = schema;
+ this.codec = codec;
+ this.windowedWrites = windowedWrites;
+ this.filenamePolicy = filenamePolicy;
+
+ Map<String, String> badKeys = Maps.newLinkedHashMap();
+ for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+ Object v = entry.getValue();
+ if (!(v instanceof String || v instanceof Long || v instanceof byte[])) {
+ badKeys.put(entry.getKey(), v.getClass().getSimpleName());
+ }
+ }
+ checkArgument(
+ badKeys.isEmpty(),
+ "Metadata value type must be one of String, Long, or byte[]. Found {}", badKeys);
+ this.metadata = ImmutableMap.copyOf(metadata);
+ }
+
+ /**
+ * Writes to the file(s) with the given prefix. This can be a local filename
* (if running locally), or a Google Cloud Storage filename of
* the form {@code "gs://<bucket>/<filepath>"}
* (if running locally or using remote execution).
*
* <p>The files written will begin with this prefix, followed by
- * a shard identifier (see {@link Bound#withNumShards}, and end
- * in a common extension, if given by {@link Bound#withSuffix}.
+ * a shard identifier (see {@link #withNumShards}, and end
+ * in a common extension, if given by {@link #withSuffix}.
*/
- public static Bound<GenericRecord> to(String prefix) {
- return new Bound<>(GenericRecord.class).to(prefix);
+ public Write<T> to(String filenamePrefix) {
+ validateOutputComponent(filenamePrefix);
+ return new Write<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ codec,
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
- /**
- * Returns a {@link PTransform} that writes to the file(s) specified by the provided
- * {@link FileBasedSink.FilenamePolicy}.
- */
- public static Bound<GenericRecord> to(FileBasedSink.FilenamePolicy filenamePolicy) {
- return new Bound<>(GenericRecord.class).to(filenamePolicy);
+ /** Writes to the file(s) specified by the provided {@link FileBasedSink.FilenamePolicy}. */
+ public Write<T> to(FileBasedSink.FilenamePolicy filenamePolicy) {
+ return new Write<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ codec,
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
/**
- * Returns a {@link PTransform} that writes to the file(s) with the
- * given filename suffix.
+ * Writes to the file(s) with the given filename suffix.
+ *
+ * <p>See {@link ShardNameTemplate} for a description of shard templates.
*/
- public static Bound<GenericRecord> withSuffix(String filenameSuffix) {
- return new Bound<>(GenericRecord.class).withSuffix(filenameSuffix);
+ public Write<T> withSuffix(String filenameSuffix) {
+ validateOutputComponent(filenameSuffix);
+ return new Write<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ codec,
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
/**
- * Returns a {@link PTransform} that uses the provided shard count.
+ * Uses the provided shard count.
*
* <p>Constraining the number of shards is likely to reduce
* the performance of a pipeline. Setting this value is not recommended
@@ -275,585 +399,271 @@ public class AvroIO {
*
* @param numShards the number of shards to use, or 0 to let the system
* decide.
+ * @see ShardNameTemplate
*/
- public static Bound<GenericRecord> withNumShards(int numShards) {
- return new Bound<>(GenericRecord.class).withNumShards(numShards);
+ public Write<T> withNumShards(int numShards) {
+ checkArgument(numShards >= 0);
+ return new Write<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ codec,
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
/**
- * Returns a {@link PTransform} that uses the given shard name
- * template.
+ * Returns a new {@link PTransform} that's like this one but
+ * that uses the given shard name template.
*
- * <p>See {@link ShardNameTemplate} for a description of shard templates.
+ * @see ShardNameTemplate
*/
- public static Bound<GenericRecord> withShardNameTemplate(String shardTemplate) {
- return new Bound<>(GenericRecord.class).withShardNameTemplate(shardTemplate);
+ public Write<T> withShardNameTemplate(String shardTemplate) {
+ return new Write<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ codec,
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
/**
- * Returns a {@link PTransform} that forces a single file as
- * output.
+ * Forces a single file as output.
*
- * <p>Constraining the number of shards is likely to reduce
- * the performance of a pipeline. Setting this value is not recommended
- * unless you require a specific number of output files.
+ * <p>This is a shortcut for {@code .withNumShards(1).withShardNameTemplate("")}
*/
- public static Bound<GenericRecord> withoutSharding() {
- return new Bound<>(GenericRecord.class).withoutSharding();
+ public Write<T> withoutSharding() {
+ return withNumShards(1).withShardNameTemplate("");
}
- /**
- * Returns a {@link PTransform} that writes Avro file(s)
- * containing records whose type is the specified Avro-generated class.
- *
- * @param <T> the type of the elements of the input PCollection
- */
- public static <T> Bound<T> withSchema(Class<T> type) {
- return new Bound<>(type).withSchema(type);
+ public Write<T> withWindowedWrites() {
+ return new Write<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ codec,
+ metadata,
+ true,
+ filenamePolicy);
}
/**
- * Returns a {@link PTransform} that writes Avro file(s)
- * containing records of the specified schema.
+ * Writes to Avro file(s) containing records whose type is the specified Avro-generated class.
*/
- public static Bound<GenericRecord> withSchema(Schema schema) {
- return new Bound<>(GenericRecord.class).withSchema(schema);
+ public Write<T> withSchema(Class<T> type) {
+ return new Write<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ ReflectData.get().getSchema(type),
+ codec,
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
- /**
- * Returns a {@link PTransform} that writes Avro file(s)
- * containing records of the specified schema in a JSON-encoded
- * string form.
- */
- public static Bound<GenericRecord> withSchema(String schema) {
- return withSchema((new Schema.Parser()).parse(schema));
+ /** Writes to Avro file(s) containing records of the specified schema. */
+ public Write<GenericRecord> withSchema(Schema schema) {
+ return new Write<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ GenericRecord.class,
+ schema,
+ codec,
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
/**
- * Returns a {@link PTransform} that writes Avro file(s) that has GCS path validation on
- * pipeline creation disabled.
- *
- * <p>This can be useful in the case where the GCS output location does
- * not exist at the pipeline creation time, but is expected to be available
- * at execution time.
+ * Writes to Avro file(s) containing records of the specified schema in a JSON-encoded string
+ * form.
*/
- public static Bound<GenericRecord> withoutValidation() {
- return new Bound<>(GenericRecord.class).withoutValidation();
+ public Write<GenericRecord> withSchema(String schema) {
+ return withSchema((new Schema.Parser()).parse(schema));
}
- /**
- * Returns a {@link PTransform} that writes Avro file(s) using specified codec.
- */
- public static Bound<GenericRecord> withCodec(CodecFactory codec) {
- return new Bound<>(GenericRecord.class).withCodec(codec);
+ /** Writes to Avro file(s) compressed using specified codec. */
+ public Write<T> withCodec(CodecFactory codec) {
+ return new Write<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ new SerializableAvroCodecFactory(codec),
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
/**
- * Returns a {@link PTransform} that writes Avro file(s) with the specified metadata.
+ * Writes to Avro file(s) with the specified metadata.
*
* <p>Supported value types are String, Long, and byte[].
*/
- public static Bound<GenericRecord> withMetadata(Map<String, Object> metadata) {
- return new Bound<>(GenericRecord.class).withMetadata(metadata);
+ public Write<T> withMetadata(Map<String, Object> metadata) {
+ return new Write<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ codec,
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
- /**
- * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or
- * multiple Avro files matching a sharding pattern).
- *
- * @param <T> the type of each of the elements of the input PCollection
- */
- public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
- private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
- private static final SerializableAvroCodecFactory DEFAULT_CODEC =
- new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
- // This should be a multiple of 4 to not get a partial encoded byte.
- private static final int METADATA_BYTES_MAX_LENGTH = 40;
-
- /** The filename to write to. */
- @Nullable
- final String filenamePrefix;
- /** Suffix to use for each filename. */
- final String filenameSuffix;
- /** Requested number of shards. 0 for automatic. */
- final int numShards;
- /** Shard template string. */
- final String shardTemplate;
- /** The class type of the records. */
- final Class<T> type;
- /** The schema of the output file. */
- @Nullable
- final Schema schema;
- final boolean windowedWrites;
- FileBasedSink.FilenamePolicy filenamePolicy;
-
- /**
- * The codec used to encode the blocks in the Avro file. String value drawn from those in
- * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
- */
- final SerializableAvroCodecFactory codec;
- /** Avro file metadata. */
- final ImmutableMap<String, Object> metadata;
-
- Bound(Class<T> type) {
- this(
- null,
- null,
- "",
- 0,
- DEFAULT_SHARD_TEMPLATE,
- type,
- null,
- DEFAULT_CODEC,
- ImmutableMap.<String, Object>of(),
- false,
- null);
- }
-
- Bound(
- String name,
- String filenamePrefix,
- String filenameSuffix,
- int numShards,
- String shardTemplate,
- Class<T> type,
- Schema schema,
- SerializableAvroCodecFactory codec,
- Map<String, Object> metadata,
- boolean windowedWrites,
- FileBasedSink.FilenamePolicy filenamePolicy) {
- super(name);
- this.filenamePrefix = filenamePrefix;
- this.filenameSuffix = filenameSuffix;
- this.numShards = numShards;
- this.shardTemplate = shardTemplate;
- this.type = type;
- this.schema = schema;
- this.codec = codec;
- this.windowedWrites = windowedWrites;
- this.filenamePolicy = filenamePolicy;
-
- Map<String, String> badKeys = Maps.newLinkedHashMap();
- for (Map.Entry<String, Object> entry : metadata.entrySet()) {
- Object v = entry.getValue();
- if (!(v instanceof String || v instanceof Long || v instanceof byte[])) {
- badKeys.put(entry.getKey(), v.getClass().getSimpleName());
- }
- }
- checkArgument(
- badKeys.isEmpty(),
- "Metadata value type must be one of String, Long, or byte[]. Found {}", badKeys);
- this.metadata = ImmutableMap.copyOf(metadata);
- }
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that writes to the file(s) with the given filename prefix.
- *
- * <p>See {@link AvroIO.Write#to(String)} for more information
- * about filenames.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> to(String filenamePrefix) {
- validateOutputComponent(filenamePrefix);
- return new Bound<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
- }
-
- public Bound<T> to(FileBasedSink.FilenamePolicy filenamePolicy) {
- return new Bound<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
- }
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that writes to the file(s) with the given filename suffix.
- *
- * <p>See {@link ShardNameTemplate} for a description of shard templates.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> withSuffix(String filenameSuffix) {
- validateOutputComponent(filenameSuffix);
- return new Bound<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
- }
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that uses the provided shard count.
- *
- * <p>Constraining the number of shards is likely to reduce
- * the performance of a pipeline. Setting this value is not recommended
- * unless you require a specific number of output files.
- *
- * <p>Does not modify this object.
- *
- * @param numShards the number of shards to use, or 0 to let the system
- * decide.
- * @see ShardNameTemplate
- */
- public Bound<T> withNumShards(int numShards) {
- checkArgument(numShards >= 0);
- return new Bound<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
- }
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that uses the given shard name template.
- *
- * <p>Does not modify this object.
- *
- * @see ShardNameTemplate
- */
- public Bound<T> withShardNameTemplate(String shardTemplate) {
- return new Bound<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
- }
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that forces a single file as output.
- *
- * <p>This is a shortcut for
- * {@code .withNumShards(1).withShardNameTemplate("")}
- *
- * <p>Does not modify this object.
- */
- public Bound<T> withoutSharding() {
- return new Bound<>(
- name,
- filenamePrefix,
- filenameSuffix,
- 1,
- "",
- type,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
- }
-
- public Bound<T> withWindowedWrites() {
- return new Bound<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- true,
- filenamePolicy);
- }
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that writes to Avro file(s) containing records whose type is the
- * specified Avro-generated class.
- *
- * <p>Does not modify this object.
- *
- * @param <X> the type of the elements of the input PCollection
- */
- public <X> Bound<X> withSchema(Class<X> type) {
- return new Bound<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- ReflectData.get().getSchema(type),
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
+ @Override
+ public PDone expand(PCollection<T> input) {
+ if (filenamePolicy == null && filenamePrefix == null) {
+ throw new IllegalStateException(
+ "need to set the filename prefix of an AvroIO.Write transform");
}
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that writes to Avro file(s) containing records of the specified
- * schema.
- *
- * <p>Does not modify this object.
- */
- public Bound<GenericRecord> withSchema(Schema schema) {
- return new Bound<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- GenericRecord.class,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
+ if (filenamePolicy != null && filenamePrefix != null) {
+ throw new IllegalStateException(
+ "cannot set both a filename policy and a filename prefix");
}
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that writes to Avro file(s) containing records of the specified
- * schema in a JSON-encoded string form.
- *
- * <p>Does not modify this object.
- */
- public Bound<GenericRecord> withSchema(String schema) {
- return withSchema((new Schema.Parser()).parse(schema));
+ if (schema == null) {
+ throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
}
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that has GCS output path validation on pipeline creation disabled.
- *
- * <p>Does not modify this object.
- *
- * <p>This can be useful in the case where the GCS output location does
- * not exist at the pipeline creation time, but is expected to be
- * available at execution time.
- */
- public Bound<T> withoutValidation() {
- return new Bound<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
+ WriteFiles<T> write = null;
+ if (filenamePolicy != null) {
+ write = WriteFiles.to(
+ new AvroSink<>(
+ filenamePolicy,
+ AvroCoder.of(type, schema),
+ codec,
+ metadata));
+ } else {
+ write = WriteFiles.to(
+ new AvroSink<>(
+ filenamePrefix,
+ filenameSuffix,
+ shardTemplate,
+ AvroCoder.of(type, schema),
+ codec,
+ metadata));
}
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that writes to Avro file(s) compressed using specified codec.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> withCodec(CodecFactory codec) {
- return new Bound<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- new SerializableAvroCodecFactory(codec),
- metadata,
- windowedWrites,
- filenamePolicy);
+ if (getNumShards() > 0) {
+ write = write.withNumShards(getNumShards());
}
-
- /**
- * Returns a new {@link PTransform} that's like this one but
- * that writes to Avro file(s) with the specified metadata.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> withMetadata(Map<String, Object> metadata) {
- return new Bound<>(
- name,
- filenamePrefix,
- filenameSuffix,
- numShards,
- shardTemplate,
- type,
- schema,
- codec,
- metadata,
- windowedWrites,
- filenamePolicy);
+ if (windowedWrites) {
+ write = write.withWindowedWrites();
}
+ return input.apply("Write", write);
+ }
- @Override
- public PDone expand(PCollection<T> input) {
- if (filenamePolicy == null && filenamePrefix == null) {
- throw new IllegalStateException(
- "need to set the filename prefix of an AvroIO.Write transform");
- }
- if (filenamePolicy != null && filenamePrefix != null) {
- throw new IllegalStateException(
- "cannot set both a filename policy and a filename prefix");
- }
- if (schema == null) {
- throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
- }
-
- WriteFiles<T> write = null;
- if (filenamePolicy != null) {
- write = WriteFiles.to(
- new AvroSink<>(
- filenamePolicy,
- AvroCoder.of(type, schema),
- codec,
- metadata));
- } else {
- write = WriteFiles.to(
- new AvroSink<>(
- filenamePrefix,
- filenameSuffix,
- shardTemplate,
- AvroCoder.of(type, schema),
- codec,
- metadata));
- }
- if (getNumShards() > 0) {
- write = write.withNumShards(getNumShards());
- }
- if (windowedWrites) {
- write = write.withWindowedWrites();
- }
- return input.apply("Write", write);
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .add(DisplayData.item("schema", type)
+ .withLabel("Record Schema"))
+ .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)
+ .withLabel("Output File Prefix"))
+ .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+ .withLabel("Output Shard Name Template"),
+ DEFAULT_SHARD_TEMPLATE)
+ .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+ .withLabel("Output File Suffix"),
+ "")
+ .addIfNotDefault(DisplayData.item("numShards", numShards)
+ .withLabel("Maximum Output Shards"),
+ 0)
+ .addIfNotDefault(DisplayData.item("codec", codec.toString())
+ .withLabel("Avro Compression Codec"),
+ DEFAULT_CODEC.toString());
+ builder.include("Metadata", new Metadata());
+ }
+ private class Metadata implements HasDisplayData {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .add(DisplayData.item("schema", type)
- .withLabel("Record Schema"))
- .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)
- .withLabel("Output File Prefix"))
- .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
- .withLabel("Output Shard Name Template"),
- DEFAULT_SHARD_TEMPLATE)
- .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
- .withLabel("Output File Suffix"),
- "")
- .addIfNotDefault(DisplayData.item("numShards", numShards)
- .withLabel("Maximum Output Shards"),
- 0)
- .addIfNotDefault(DisplayData.item("codec", codec.toString())
- .withLabel("Avro Compression Codec"),
- DEFAULT_CODEC.toString());
- builder.include("Metadata", new Metadata());
- }
-
- private class Metadata implements HasDisplayData {
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- for (Map.Entry<String, Object> entry : metadata.entrySet()) {
- DisplayData.Type type = DisplayData.inferType(entry.getValue());
- if (type != null) {
- builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
- } else {
- String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue());
- String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH
- ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "...";
- builder.add(DisplayData.item(entry.getKey(), repr));
- }
+ for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+ DisplayData.Type type = DisplayData.inferType(entry.getValue());
+ if (type != null) {
+ builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
+ } else {
+ String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue());
+ String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH
+ ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "...";
+ builder.add(DisplayData.item(entry.getKey(), repr));
}
}
}
+ }
- /**
- * Returns the current shard name template string.
- */
- public String getShardNameTemplate() {
- return shardTemplate;
- }
-
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
+ /**
+ * Returns the current shard name template string.
+ */
+ public String getShardNameTemplate() {
+ return shardTemplate;
+ }
- public String getFilenamePrefix() {
- return filenamePrefix;
- }
+ @Override
+ protected Coder<Void> getDefaultOutputCoder() {
+ return VoidCoder.of();
+ }
- public String getShardTemplate() {
- return shardTemplate;
- }
+ public String getFilenamePrefix() {
+ return filenamePrefix;
+ }
- public int getNumShards() {
- return numShards;
- }
+ public String getShardTemplate() {
+ return shardTemplate;
+ }
- public String getFilenameSuffix() {
- return filenameSuffix;
- }
+ public int getNumShards() {
+ return numShards;
+ }
- public Class<T> getType() {
- return type;
- }
+ public String getFilenameSuffix() {
+ return filenameSuffix;
+ }
- public Schema getSchema() {
- return schema;
- }
+ public Class<T> getType() {
+ return type;
+ }
- public CodecFactory getCodec() {
- return codec.getCodec();
- }
+ public Schema getSchema() {
+ return schema;
+ }
- public Map<String, Object> getMetadata() {
- return metadata;
- }
+ public CodecFactory getCodec() {
+ return codec.getCodec();
}
- /** Disallow construction of utility class. */
- private Write() {}
+ public Map<String, Object> getMetadata() {
+ return metadata;
+ }
}
// Pattern which matches old-style shard output patterns, which are now
http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 2144b0d..7df1b18 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -51,7 +51,6 @@ import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.io.AvroIO.Write.Bound;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
@@ -104,7 +103,7 @@ public class AvroIOTest {
@Test
public void testAvroIOGetName() {
assertEquals("AvroIO.Read", AvroIO.read().from("gs://bucket/foo*/baz").getName());
- assertEquals("AvroIO.Write", AvroIO.Write.to("gs://bucket/foo/baz").getName());
+ assertEquals("AvroIO.Write", AvroIO.write().to("gs://bucket/foo/baz").getName());
}
@DefaultCoder(AvroCoder.class)
@@ -145,7 +144,7 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+ .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
.withoutSharding()
.withSchema(GenericClass.class));
p.run();
@@ -169,7 +168,7 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+ .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
.withoutSharding()
.withCodec(CodecFactory.deflateCodec(9))
.withSchema(GenericClass.class));
@@ -196,7 +195,7 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+ .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
.withoutSharding()
.withSchema(GenericClass.class)
.withCodec(CodecFactory.nullCodec()));
@@ -264,7 +263,7 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+ .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
.withoutSharding()
.withSchema(GenericClass.class));
p.run();
@@ -372,7 +371,7 @@ public class AvroIOTest {
windowedAvroWritePipeline
.apply(values)
.apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
- .apply(AvroIO.Write.to(new WindowedFilenamePolicy(outputFilePrefix))
+ .apply(AvroIO.<GenericClass>write().to(new WindowedFilenamePolicy(outputFilePrefix))
.withWindowedWrites()
.withNumShards(2)
.withSchema(GenericClass.class));
@@ -407,14 +406,14 @@ public class AvroIOTest {
@Test
public void testWriteWithDefaultCodec() throws Exception {
- AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+ AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
.to("gs://bucket/foo/baz");
assertEquals(CodecFactory.deflateCodec(6).toString(), write.getCodec().toString());
}
@Test
public void testWriteWithCustomCodec() throws Exception {
- AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+ AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
.to("gs://bucket/foo/baz")
.withCodec(CodecFactory.snappyCodec());
assertEquals(SNAPPY_CODEC, write.getCodec().toString());
@@ -423,11 +422,11 @@ public class AvroIOTest {
@Test
@SuppressWarnings("unchecked")
public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
- AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+ AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
.to("gs://bucket/foo/baz")
.withCodec(CodecFactory.deflateCodec(9));
- AvroIO.Write.Bound<GenericRecord> serdeWrite = SerializableUtils.clone(write);
+ AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write);
assertEquals(CodecFactory.deflateCodec(9).toString(), serdeWrite.getCodec().toString());
}
@@ -435,11 +434,11 @@ public class AvroIOTest {
@Test
@SuppressWarnings("unchecked")
public void testWriteWithSerDeCustomXZCodec() throws Exception {
- AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+ AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
.to("gs://bucket/foo/baz")
.withCodec(CodecFactory.xzCodec(9));
- AvroIO.Write.Bound<GenericRecord> serdeWrite = SerializableUtils.clone(write);
+ AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write);
assertEquals(CodecFactory.xzCodec(9).toString(), serdeWrite.getCodec().toString());
}
@@ -453,7 +452,7 @@ public class AvroIOTest {
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+ .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
.withoutSharding()
.withSchema(GenericClass.class)
.withMetadata(ImmutableMap.<String, Object>of(
@@ -475,7 +474,8 @@ public class AvroIOTest {
File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
String outputFilePrefix = baseOutputFile.getAbsolutePath();
- Bound<String> write = AvroIO.Write.to(outputFilePrefix).withSchema(String.class);
+ AvroIO.Write<String> write =
+ AvroIO.<String>write().to(outputFilePrefix).withSchema(String.class);
if (numShards > 1) {
System.out.println("NumShards " + numShards);
write = write.withNumShards(numShards);
@@ -556,7 +556,7 @@ public class AvroIOTest {
@Test
public void testWriteDisplayData() {
- AvroIO.Write.Bound<?> write = AvroIO.Write
+ AvroIO.Write<?> write = AvroIO.<GenericClass>write()
.to("foo")
.withShardNameTemplate("-SS-of-NN-")
.withSuffix("bar")
@@ -584,10 +584,9 @@ public class AvroIOTest {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
- AvroIO.Write.Bound<?> write = AvroIO.Write
+ AvroIO.Write<?> write = AvroIO.<GenericRecord>write()
.to(outputPath)
- .withSchema(Schema.create(Schema.Type.STRING))
- .withoutValidation();
+ .withSchema(Schema.create(Schema.Type.STRING));
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat("AvroIO.Write should include the file pattern in its primitive transform",
http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index b974663..ba7f1b9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -271,16 +271,16 @@ public class AvroIOTransformTest {
ImmutableList.<Object[]>builder()
.add(
new Object[] {
- AvroIO.Write.withSchema(AvroGeneratedUser.class),
+ AvroIO.<AvroGeneratedUser>write().withSchema(AvroGeneratedUser.class),
generatedClass
},
new Object[] {
- AvroIO.Write.withSchema(SCHEMA),
+ AvroIO.write().withSchema(SCHEMA),
fromSchema
},
new Object[] {
- AvroIO.Write.withSchema(SCHEMA_STRING),
+ AvroIO.write().withSchema(SCHEMA_STRING),
fromSchemaString
})
.build();
@@ -288,17 +288,17 @@ public class AvroIOTransformTest {
@SuppressWarnings("DefaultAnnotationParam")
@Parameterized.Parameter(0)
- public AvroIO.Write.Bound writeTransform;
+ public AvroIO.Write writeTransform;
@Parameterized.Parameter(1)
public String testAlias;
- private <T> void runTestWrite(final AvroIO.Write.Bound<T> writeBuilder)
+ private <T> void runTestWrite(final AvroIO.Write<T> writeBuilder)
throws Exception {
final File avroFile = tmpFolder.newFile("file.avro");
final AvroGeneratedUser[] users = generateAvroObjects();
- final AvroIO.Write.Bound<T> write = writeBuilder.to(avroFile.getPath());
+ final AvroIO.Write<T> write = writeBuilder.to(avroFile.getPath());
@SuppressWarnings("unchecked") final
PCollection<T> input =
[05/11] beam git commit: Moves AvroSink to upper level
Posted by jk...@apache.org.
Moves AvroSink to upper level
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0166e199
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0166e199
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0166e199
Branch: refs/heads/master
Commit: 0166e19991af956a48ef99310f5f1916225255aa
Parents: 2fa3c34
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 18:05:00 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 131 ----------------
.../java/org/apache/beam/sdk/io/AvroSink.java | 150 +++++++++++++++++++
2 files changed, 150 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0166e199/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 2031569..75e14d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -19,33 +19,24 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.BaseEncoding;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Read.Bounded;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -952,126 +943,4 @@ public class AvroIO {
/** Disallow construction of utility class. */
private AvroIO() {}
-
- /**
- * A {@link FileBasedSink} for Avro files.
- */
- @VisibleForTesting
- static class AvroSink<T> extends FileBasedSink<T> {
- private final AvroCoder<T> coder;
- private final SerializableAvroCodecFactory codec;
- private final ImmutableMap<String, Object> metadata;
-
- @VisibleForTesting
- AvroSink(
- FilenamePolicy filenamePolicy,
- AvroCoder<T> coder,
- SerializableAvroCodecFactory codec,
- ImmutableMap<String, Object> metadata) {
- super(filenamePolicy);
- this.coder = coder;
- this.codec = codec;
- this.metadata = metadata;
- }
-
- @VisibleForTesting
- AvroSink(
- String baseOutputFilename,
- String extension,
- String fileNameTemplate,
- AvroCoder<T> coder,
- SerializableAvroCodecFactory codec,
- ImmutableMap<String, Object> metadata) {
- super(baseOutputFilename, extension, fileNameTemplate);
- this.coder = coder;
- this.codec = codec;
- this.metadata = metadata;
- }
-
- @Override
- public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation() {
- return new AvroWriteOperation<>(this, coder, codec, metadata);
- }
-
- /**
- * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation
- * FileBasedWriteOperation} for Avro files.
- */
- private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> {
- private final AvroCoder<T> coder;
- private final SerializableAvroCodecFactory codec;
- private final ImmutableMap<String, Object> metadata;
-
- private AvroWriteOperation(AvroSink<T> sink,
- AvroCoder<T> coder,
- SerializableAvroCodecFactory codec,
- ImmutableMap<String, Object> metadata) {
- super(sink);
- this.coder = coder;
- this.codec = codec;
- this.metadata = metadata;
- }
-
- @Override
- public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
- return new AvroWriter<>(this, coder, codec, metadata);
- }
- }
-
- /**
- * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter}
- * for Avro files.
- */
- private static class AvroWriter<T> extends FileBasedWriter<T> {
- private final AvroCoder<T> coder;
- private DataFileWriter<T> dataFileWriter;
- private SerializableAvroCodecFactory codec;
- private final ImmutableMap<String, Object> metadata;
-
- public AvroWriter(FileBasedWriteOperation<T> writeOperation,
- AvroCoder<T> coder,
- SerializableAvroCodecFactory codec,
- ImmutableMap<String, Object> metadata) {
- super(writeOperation, MimeTypes.BINARY);
- this.coder = coder;
- this.codec = codec;
- this.metadata = metadata;
- }
-
- @SuppressWarnings("deprecation") // uses internal test functionality.
- @Override
- protected void prepareWrite(WritableByteChannel channel) throws Exception {
- DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class)
- ? new GenericDatumWriter<T>(coder.getSchema())
- : new ReflectDatumWriter<T>(coder.getSchema());
-
- dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec());
- for (Map.Entry<String, Object> entry : metadata.entrySet()) {
- Object v = entry.getValue();
- if (v instanceof String) {
- dataFileWriter.setMeta(entry.getKey(), (String) v);
- } else if (v instanceof Long) {
- dataFileWriter.setMeta(entry.getKey(), (Long) v);
- } else if (v instanceof byte[]) {
- dataFileWriter.setMeta(entry.getKey(), (byte[]) v);
- } else {
- throw new IllegalStateException(
- "Metadata value type must be one of String, Long, or byte[]. Found "
- + v.getClass().getSimpleName());
- }
- }
- dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel));
- }
-
- @Override
- public void write(T value) throws Exception {
- dataFileWriter.append(value);
- }
-
- @Override
- protected void finishWrite() throws Exception {
- dataFileWriter.flush();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0166e199/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
new file mode 100644
index 0000000..16f233c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.common.collect.ImmutableMap;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.Map;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.MimeTypes;
+
+/**
+ * A {@link FileBasedSink} for Avro files.
+ */
+class AvroSink<T> extends FileBasedSink<T> {
+ private final AvroCoder<T> coder;
+ private final SerializableAvroCodecFactory codec;
+ private final ImmutableMap<String, Object> metadata;
+
+ AvroSink(
+ FilenamePolicy filenamePolicy,
+ AvroCoder<T> coder,
+ SerializableAvroCodecFactory codec,
+ ImmutableMap<String, Object> metadata) {
+ super(filenamePolicy);
+ this.coder = coder;
+ this.codec = codec;
+ this.metadata = metadata;
+ }
+
+ AvroSink(
+ String baseOutputFilename,
+ String extension,
+ String fileNameTemplate,
+ AvroCoder<T> coder,
+ SerializableAvroCodecFactory codec,
+ ImmutableMap<String, Object> metadata) {
+ super(baseOutputFilename, extension, fileNameTemplate);
+ this.coder = coder;
+ this.codec = codec;
+ this.metadata = metadata;
+ }
+
+ @Override
+ public FileBasedWriteOperation<T> createWriteOperation() {
+ return new AvroWriteOperation<>(this, coder, codec, metadata);
+ }
+
+ /**
+ * A {@link FileBasedWriteOperation
+ * FileBasedWriteOperation} for Avro files.
+ */
+ private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> {
+ private final AvroCoder<T> coder;
+ private final SerializableAvroCodecFactory codec;
+ private final ImmutableMap<String, Object> metadata;
+
+ private AvroWriteOperation(AvroSink<T> sink,
+ AvroCoder<T> coder,
+ SerializableAvroCodecFactory codec,
+ ImmutableMap<String, Object> metadata) {
+ super(sink);
+ this.coder = coder;
+ this.codec = codec;
+ this.metadata = metadata;
+ }
+
+ @Override
+ public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
+ return new AvroWriter<>(this, coder, codec, metadata);
+ }
+ }
+
+ /**
+ * A {@link FileBasedWriter FileBasedWriter}
+ * for Avro files.
+ */
+ private static class AvroWriter<T> extends FileBasedWriter<T> {
+ private final AvroCoder<T> coder;
+ private DataFileWriter<T> dataFileWriter;
+ private SerializableAvroCodecFactory codec;
+ private final ImmutableMap<String, Object> metadata;
+
+ public AvroWriter(FileBasedWriteOperation<T> writeOperation,
+ AvroCoder<T> coder,
+ SerializableAvroCodecFactory codec,
+ ImmutableMap<String, Object> metadata) {
+ super(writeOperation, MimeTypes.BINARY);
+ this.coder = coder;
+ this.codec = codec;
+ this.metadata = metadata;
+ }
+
+ @SuppressWarnings("deprecation") // uses internal test functionality.
+ @Override
+ protected void prepareWrite(WritableByteChannel channel) throws Exception {
+ DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class)
+ ? new GenericDatumWriter<T>(coder.getSchema())
+ : new ReflectDatumWriter<T>(coder.getSchema());
+
+ dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec());
+ for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+ Object v = entry.getValue();
+ if (v instanceof String) {
+ dataFileWriter.setMeta(entry.getKey(), (String) v);
+ } else if (v instanceof Long) {
+ dataFileWriter.setMeta(entry.getKey(), (Long) v);
+ } else if (v instanceof byte[]) {
+ dataFileWriter.setMeta(entry.getKey(), (byte[]) v);
+ } else {
+ throw new IllegalStateException(
+ "Metadata value type must be one of String, Long, or byte[]. Found "
+ + v.getClass().getSimpleName());
+ }
+ }
+ dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel));
+ }
+
+ @Override
+ public void write(T value) throws Exception {
+ dataFileWriter.append(value);
+ }
+
+ @Override
+ protected void finishWrite() throws Exception {
+ dataFileWriter.flush();
+ }
+ }
+}
[03/11] beam git commit: Converts AvroIO.Read to AutoValue
Posted by jk...@apache.org.
Converts AvroIO.Read to AutoValue
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/439f2ca0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/439f2ca0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/439f2ca0
Branch: refs/heads/master
Commit: 439f2ca03c0d8994e5736b9493f61d9cb4267cb2
Parents: ff7a1d4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 18:37:49 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 67 +++++++++-----------
1 file changed, 29 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/439f2ca0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index ed172d1..2f1d917 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.BaseEncoding;
@@ -130,12 +131,15 @@ public class AvroIO {
* <p>The schema must be specified using one of the {@code withSchema} functions.
*/
public static <T> Read<T> read() {
- return new Read<>();
+ return new AutoValue_AvroIO_Read.Builder<T>().build();
}
/** Reads Avro file(s) containing records of the specified schema. */
public static Read<GenericRecord> readGenericRecords(Schema schema) {
- return new Read<>(null, null, GenericRecord.class, schema);
+ return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
+ .setRecordClass(GenericRecord.class)
+ .setSchema(schema)
+ .build();
}
/**
@@ -146,26 +150,21 @@ public class AvroIO {
}
/** Implementation of {@link #read}. */
- public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
- /** The filepattern to read from. */
- @Nullable
- final String filepattern;
- /** The class type of the records. */
- @Nullable
- final Class<T> type;
- /** The schema of the input file. */
- @Nullable
- final Schema schema;
-
- Read() {
- this(null, null, null, null);
- }
+ @AutoValue
+ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+ @Nullable abstract String getFilepattern();
+ @Nullable abstract Class<T> getRecordClass();
+ @Nullable abstract Schema getSchema();
+
+ abstract Builder<T> toBuilder();
- Read(String name, String filepattern, Class<T> type, Schema schema) {
- super(name);
- this.filepattern = filepattern;
- this.type = type;
- this.schema = schema;
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setFilepattern(String filepattern);
+ abstract Builder<T> setRecordClass(Class<T> recordClass);
+ abstract Builder<T> setSchema(Schema schema);
+
+ abstract Read<T> build();
}
/**
@@ -178,7 +177,7 @@ public class AvroIO {
* Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
*/
public Read<T> from(String filepattern) {
- return new Read<>(name, filepattern, type, schema);
+ return toBuilder().setFilepattern(filepattern).build();
}
/**
@@ -187,26 +186,26 @@ public class AvroIO {
* specified Avro-generated class.
*/
public Read<T> withSchema(Class<T> type) {
- return new Read<>(name, filepattern, type, ReflectData.get().getSchema(type));
+ return toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build();
}
@Override
public PCollection<T> expand(PBegin input) {
- if (filepattern == null) {
+ if (getFilepattern() == null) {
throw new IllegalStateException(
"need to set the filepattern of an AvroIO.Read transform");
}
- if (schema == null) {
+ if (getSchema() == null) {
throw new IllegalStateException("need to set the schema of an AvroIO.Read transform");
}
@SuppressWarnings("unchecked")
Bounded<T> read =
- type == GenericRecord.class
+ getRecordClass() == GenericRecord.class
? (Bounded<T>) org.apache.beam.sdk.io.Read.from(
- AvroSource.from(filepattern).withSchema(schema))
+ AvroSource.from(getFilepattern()).withSchema(getSchema()))
: org.apache.beam.sdk.io.Read.from(
- AvroSource.from(filepattern).withSchema(type));
+ AvroSource.from(getFilepattern()).withSchema(getRecordClass()));
PCollection<T> pcol = input.getPipeline().apply("Read", read);
// Honor the default output coder that would have been used by this PTransform.
@@ -218,21 +217,13 @@ public class AvroIO {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
- .addIfNotNull(DisplayData.item("filePattern", filepattern)
+ .addIfNotNull(DisplayData.item("filePattern", getFilepattern())
.withLabel("Input File Pattern"));
}
@Override
protected Coder<T> getDefaultOutputCoder() {
- return AvroCoder.of(type, schema);
- }
-
- public String getFilepattern() {
- return filepattern;
- }
-
- public Schema getSchema() {
- return schema;
+ return AvroCoder.of(getRecordClass(), getSchema());
}
}