You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/28 17:53:06 UTC
[2/4] beam git commit: [BEAM-2677] AvroIO.parseGenericRecords -
schemaless AvroIO.read
[BEAM-2677] AvroIO.parseGenericRecords - schemaless AvroIO.read
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ebd00411
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ebd00411
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ebd00411
Branch: refs/heads/master
Commit: ebd004119c387787d0e0fcd0487e1b2754c7dbc5
Parents: 62c922b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Jul 24 15:07:15 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 10:25:07 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 199 ++++++++++++++++++-
.../java/org/apache/beam/sdk/io/AvroSource.java | 166 ++++++++++++----
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 89 ++++++---
.../org/apache/beam/sdk/io/AvroSourceTest.java | 30 ++-
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 4 +-
5 files changed, 406 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 018b84f..27c9073 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -35,7 +35,9 @@ import org.apache.avro.reflect.ReflectData;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
@@ -53,13 +55,16 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
/**
* {@link PTransform}s for reading and writing Avro files.
*
- * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, using
- * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. Alternatively, if
- * the filepatterns to be read are themselves in a {@link PCollection}, apply {@link #readAll}.
+ * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at
+ * pipeline construction time, use {@code AvroIO.read()}, using {@link AvroIO.Read#from} to specify
+ * the filename or filepattern to read from. Alternatively, if the filepatterns to be read are
+ * themselves in a {@link PCollection}, apply {@link #readAll}.
*
* <p>See {@link FileSystems} for information on supported file systems and filepatterns.
*
@@ -70,6 +75,12 @@ import org.apache.beam.sdk.values.PDone;
* schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link
* #readAllGenericRecords}.
*
+ * <p>To read records from files whose schema is unknown at pipeline construction time or differs
+ * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
+ * parsing function for converting each {@link GenericRecord} into a value of your custom type.
+ * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link
+ * #parseAllGenericRecords}.
+ *
* <p>For example:
*
* <pre>{@code
@@ -84,12 +95,20 @@ import org.apache.beam.sdk.values.PDone;
* PCollection<GenericRecord> records =
* p.apply(AvroIO.readGenericRecords(schema)
* .from("gs://my_bucket/path/to/records-*.avro"));
+ *
+ * PCollection<Foo> records =
+ * p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
+ * public Foo apply(GenericRecord record) {
+ * // If needed, access the schema of the record using record.getSchema()
+ * return ...;
+ * }
+ * }));
* }</pre>
*
* <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
- * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
- * scalability. Note that it may decrease performance if the filepattern matches only a small
- * number of files.
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} or {@link
+ * Parse#withHintMatchesManyFiles} for better performance and scalability. Note that it may decrease
+ * performance if the filepattern matches only a small number of files.
*
* <p>Reading from a {@link PCollection} of filepatterns:
*
@@ -101,6 +120,8 @@ import org.apache.beam.sdk.values.PDone;
* filepatterns.apply(AvroIO.read(AvroAutoGenClass.class));
* PCollection<GenericRecord> genericRecords =
* filepatterns.apply(AvroIO.readGenericRecords(schema));
+ * PCollection<Foo> records =
+ * filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction...);
* }</pre>
*
* <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using
@@ -208,6 +229,29 @@ public class AvroIO {
}
/**
+ * Reads Avro file(s) containing records of an unspecified schema and converting each record to a
+ * custom type.
+ */
+ public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
+ return new AutoValue_AvroIO_Parse.Builder<T>()
+ .setParseFn(parseFn)
+ .setHintMatchesManyFiles(false)
+ .build();
+ }
+
+ /**
+ * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each filepattern in the
+ * input {@link PCollection}.
+ */
+ public static <T> ParseAll<T> parseAllGenericRecords(
+ SerializableFunction<GenericRecord, T> parseFn) {
+ return new AutoValue_AvroIO_ParseAll.Builder<T>()
+ .setParseFn(parseFn)
+ .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
+ .build();
+ }
+
+ /**
* Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
* pattern).
*/
@@ -387,6 +431,149 @@ public class AvroIO {
/////////////////////////////////////////////////////////////////////////////
+ /** Implementation of {@link #parseGenericRecords}. */
+ @AutoValue
+ public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
+ @Nullable abstract ValueProvider<String> getFilepattern();
+ abstract SerializableFunction<GenericRecord, T> getParseFn();
+ @Nullable abstract Coder<T> getCoder();
+ abstract boolean getHintMatchesManyFiles();
+
+ abstract Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+ abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
+ abstract Builder<T> setCoder(Coder<T> coder);
+ abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles);
+
+ abstract Parse<T> build();
+ }
+
+ /** Reads from the given filename or filepattern. */
+ public Parse<T> from(String filepattern) {
+ return from(StaticValueProvider.of(filepattern));
+ }
+
+ /** Like {@link #from(String)}. */
+ public Parse<T> from(ValueProvider<String> filepattern) {
+ return toBuilder().setFilepattern(filepattern).build();
+ }
+
+ /** Sets a coder for the result of the parse function. */
+ public Parse<T> withCoder(Coder<T> coder) {
+ return toBuilder().setCoder(coder).build();
+ }
+
+ /** Like {@link Read#withHintMatchesManyFiles()}. */
+ public Parse<T> withHintMatchesManyFiles() {
+ return toBuilder().setHintMatchesManyFiles(true).build();
+ }
+
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ checkNotNull(getFilepattern(), "filepattern");
+ Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
+ if (getHintMatchesManyFiles()) {
+ return input
+ .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+ .apply(parseAllGenericRecords(getParseFn()).withCoder(getCoder()));
+ }
+ return input.apply(
+ org.apache.beam.sdk.io.Read.from(
+ AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
+ }
+
+ private static <T> Coder<T> inferCoder(
+ @Nullable Coder<T> explicitCoder,
+ SerializableFunction<GenericRecord, T> parseFn,
+ CoderRegistry coderRegistry) {
+ if (explicitCoder != null) {
+ return explicitCoder;
+ }
+ // If a coder was not specified explicitly, infer it from parse fn.
+ TypeDescriptor<T> descriptor = TypeDescriptors.outputOf(parseFn);
+ String message =
+ "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().";
+ checkArgument(descriptor != null, message);
+ try {
+ return coderRegistry.getCoder(descriptor);
+ } catch (CannotProvideCoderException e) {
+ throw new IllegalArgumentException(message, e);
+ }
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(
+ DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+ .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ /** Implementation of {@link #parseAllGenericRecords}. */
+ @AutoValue
+ public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+ abstract SerializableFunction<GenericRecord, T> getParseFn();
+ @Nullable abstract Coder<T> getCoder();
+ abstract long getDesiredBundleSizeBytes();
+
+ abstract Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
+ abstract Builder<T> setCoder(Coder<T> coder);
+ abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+ abstract ParseAll<T> build();
+ }
+
+ /** Specifies the coder for the result of the {@code parseFn}. */
+ public ParseAll<T> withCoder(Coder<T> coder) {
+ return toBuilder().setCoder(coder).build();
+ }
+
+ @VisibleForTesting
+ ParseAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+ return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+ }
+
+ @Override
+ public PCollection<T> expand(PCollection<String> input) {
+ final Coder<T> coder =
+ Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
+ SerializableFunction<String, FileBasedSource<T>> createSource =
+ new SerializableFunction<String, FileBasedSource<T>>() {
+ @Override
+ public FileBasedSource<T> apply(String input) {
+ return AvroSource.from(input).withParseFn(getParseFn(), coder);
+ }
+ };
+ return input
+ .apply(
+ "Parse all via FileBasedSource",
+ new ReadAllViaFileBasedSource<>(
+ SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */,
+ getDesiredBundleSizeBytes(),
+ createSource))
+ .setCoder(coder);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
/** Implementation of {@link #write}. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index a98d870..d277503 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@@ -27,8 +28,10 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
import java.io.ObjectStreamException;
import java.io.PushbackInputStream;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
@@ -53,10 +56,12 @@ import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
@@ -130,19 +135,84 @@ public class AvroSource<T> extends BlockBasedSource<T> {
// The default sync interval is 64k.
private static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * DataFileConstants.DEFAULT_SYNC_INTERVAL;
- // The type of the records contained in the file.
- private final Class<T> type;
+ // Use cases of AvroSource are:
+ // 1) AvroSource<GenericRecord> Reading GenericRecord records with a specified schema.
+ // 2) AvroSource<Foo> Reading records of a generated Avro class Foo.
+ // 3) AvroSource<T> Reading GenericRecord records with an unspecified schema
+ // and converting them to type T.
+ // | Case 1 | Case 2 | Case 3 |
+ // type | GenericRecord | Foo | GenericRecord |
+ // readerSchemaString | non-null | non-null | null |
+ // parseFn | null | null | non-null |
+ // outputCoder | null | null | non-null |
+ private static class Mode<T> implements Serializable {
+ private final Class<?> type;
+
+ // The JSON schema used to decode records.
+ @Nullable
+ private String readerSchemaString;
+
+ @Nullable
+ private final SerializableFunction<GenericRecord, T> parseFn;
+
+ @Nullable
+ private final Coder<T> outputCoder;
+
+ private Mode(
+ Class<?> type,
+ @Nullable String readerSchemaString,
+ @Nullable SerializableFunction<GenericRecord, T> parseFn,
+ @Nullable Coder<T> outputCoder) {
+ this.type = type;
+ this.readerSchemaString = internSchemaString(readerSchemaString);
+ this.parseFn = parseFn;
+ this.outputCoder = outputCoder;
+ }
- // The JSON schema used to decode records.
- @Nullable
- private final String readerSchemaString;
+ private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
+ is.defaultReadObject();
+ readerSchemaString = internSchemaString(readerSchemaString);
+ }
+
+ private Coder<T> getOutputCoder() {
+ if (parseFn == null) {
+ return AvroCoder.of((Class<T>) type, internOrParseSchemaString(readerSchemaString));
+ } else {
+ return outputCoder;
+ }
+ }
+
+ private void validate() {
+ if (parseFn == null) {
+ checkArgument(
+ readerSchemaString != null,
+ "schema must be specified using withSchema() when not using a parse fn");
+ }
+ }
+ }
+
+ private static Mode<GenericRecord> readGenericRecordsWithSchema(String schema) {
+ return new Mode<>(GenericRecord.class, schema, null, null);
+ }
+ private static <T> Mode<T> readGeneratedClasses(Class<T> clazz) {
+ return new Mode<>(clazz, ReflectData.get().getSchema(clazz).toString(), null, null);
+ }
+ private static <T> Mode<T> parseGenericRecords(
+ SerializableFunction<GenericRecord, T> parseFn, Coder<T> outputCoder) {
+ return new Mode<>(GenericRecord.class, null, parseFn, outputCoder);
+ }
+
+ private final Mode<T> mode;
/**
- * Reads from the given file name or pattern ("glob"). The returned source can be further
+ * Reads from the given file name or pattern ("glob"). The returned source needs to be further
* configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
*/
public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {
- return new AvroSource<>(fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class);
+ return new AvroSource<>(
+ fileNameOrPattern,
+ DEFAULT_MIN_BUNDLE_SIZE,
+ readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
}
/** Like {@link #from(ValueProvider)}. */
@@ -152,23 +222,40 @@ public class AvroSource<T> extends BlockBasedSource<T> {
/** Reads files containing records that conform to the given schema. */
public AvroSource<GenericRecord> withSchema(String schema) {
+ checkNotNull(schema, "schema");
return new AvroSource<>(
- getFileOrPatternSpecProvider(), getMinBundleSize(), schema, GenericRecord.class);
+ getFileOrPatternSpecProvider(),
+ getMinBundleSize(),
+ readGenericRecordsWithSchema(schema));
}
/** Like {@link #withSchema(String)}. */
public AvroSource<GenericRecord> withSchema(Schema schema) {
- return new AvroSource<>(
- getFileOrPatternSpecProvider(), getMinBundleSize(), schema.toString(), GenericRecord.class);
+ checkNotNull(schema, "schema");
+ return withSchema(schema.toString());
}
/** Reads files containing records of the given class. */
public <X> AvroSource<X> withSchema(Class<X> clazz) {
+ checkNotNull(clazz, "clazz");
+ return new AvroSource<>(
+ getFileOrPatternSpecProvider(),
+ getMinBundleSize(),
+ readGeneratedClasses(clazz));
+ }
+
+ /**
+ * Reads {@link GenericRecord} of unspecified schema and maps them to instances of a custom type
+ * using the given {@code parseFn} and encoded using the given coder.
+ */
+ public <X> AvroSource<X> withParseFn(
+ SerializableFunction<GenericRecord, X> parseFn, Coder<X> coder) {
+ checkNotNull(parseFn, "parseFn");
+ checkNotNull(parseFn, "coder");
return new AvroSource<>(
getFileOrPatternSpecProvider(),
getMinBundleSize(),
- ReflectData.get().getSchema(clazz).toString(),
- clazz);
+ parseGenericRecords(parseFn, coder));
}
/**
@@ -176,19 +263,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
* minBundleSize} and its use.
*/
public AvroSource<T> withMinBundleSize(long minBundleSize) {
- return new AvroSource<>(
- getFileOrPatternSpecProvider(), minBundleSize, readerSchemaString, type);
+ return new AvroSource<>(getFileOrPatternSpecProvider(), minBundleSize, mode);
}
/** Constructor for FILEPATTERN mode. */
private AvroSource(
ValueProvider<String> fileNameOrPattern,
long minBundleSize,
- String readerSchemaString,
- Class<T> type) {
+ Mode<T> mode) {
super(fileNameOrPattern, minBundleSize);
- this.readerSchemaString = internSchemaString(readerSchemaString);
- this.type = type;
+ this.mode = mode;
}
/** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */
@@ -197,18 +281,15 @@ public class AvroSource<T> extends BlockBasedSource<T> {
long minBundleSize,
long startOffset,
long endOffset,
- String readerSchemaString,
- Class<T> type) {
+ Mode<T> mode) {
super(metadata, minBundleSize, startOffset, endOffset);
- this.readerSchemaString = internSchemaString(readerSchemaString);
- this.type = type;
+ this.mode = mode;
}
@Override
public void validate() {
- // AvroSource objects do not need to be configured with more than a file pattern. Overridden to
- // make this explicit.
super.validate();
+ mode.validate();
}
/**
@@ -225,7 +306,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
@Override
public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
- return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, readerSchemaString, type);
+ return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, mode);
}
@Override
@@ -234,14 +315,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
}
@Override
- public AvroCoder<T> getDefaultOutputCoder() {
- return AvroCoder.of(type, internOrParseSchemaString(readerSchemaString));
+ public Coder<T> getDefaultOutputCoder() {
+ return mode.getOutputCoder();
}
@VisibleForTesting
@Nullable
String getReaderSchemaString() {
- return readerSchemaString;
+ return mode.readerSchemaString;
}
/** Avro file metadata. */
@@ -380,15 +461,9 @@ public class AvroSource<T> extends BlockBasedSource<T> {
switch (getMode()) {
case SINGLE_FILE_OR_SUBRANGE:
return new AvroSource<>(
- getSingleFileMetadata(),
- getMinBundleSize(),
- getStartOffset(),
- getEndOffset(),
- readerSchemaString,
- type);
+ getSingleFileMetadata(), getMinBundleSize(), getStartOffset(), getEndOffset(), mode);
case FILEPATTERN:
- return new AvroSource<>(
- getFileOrPatternSpecProvider(), getMinBundleSize(), readerSchemaString, type);
+ return new AvroSource<>(getFileOrPatternSpecProvider(), getMinBundleSize(), mode);
default:
throw new InvalidObjectException(
String.format("Unknown mode %s for AvroSource %s", getMode(), this));
@@ -402,6 +477,8 @@ public class AvroSource<T> extends BlockBasedSource<T> {
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
static class AvroBlock<T> extends Block<T> {
+ private final Mode<T> mode;
+
// The number of records in the block.
private final long numRecords;
@@ -412,7 +489,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
private long currentRecordIndex = 0;
// A DatumReader to read records from the block.
- private final DatumReader<T> reader;
+ private final DatumReader<?> reader;
// A BinaryDecoder used by the reader to decode records.
private final BinaryDecoder decoder;
@@ -455,19 +532,19 @@ public class AvroSource<T> extends BlockBasedSource<T> {
AvroBlock(
byte[] data,
long numRecords,
- Class<? extends T> type,
- String readerSchemaString,
+ Mode<T> mode,
String writerSchemaString,
String codec)
throws IOException {
+ this.mode = mode;
this.numRecords = numRecords;
checkNotNull(writerSchemaString, "writerSchemaString");
Schema writerSchema = internOrParseSchemaString(writerSchemaString);
Schema readerSchema =
internOrParseSchemaString(
- MoreObjects.firstNonNull(readerSchemaString, writerSchemaString));
+ MoreObjects.firstNonNull(mode.readerSchemaString, writerSchemaString));
this.reader =
- (type == GenericRecord.class)
+ (mode.type == GenericRecord.class)
? new GenericDatumReader<T>(writerSchema, readerSchema)
: new ReflectDatumReader<T>(writerSchema, readerSchema);
this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null);
@@ -483,7 +560,9 @@ public class AvroSource<T> extends BlockBasedSource<T> {
if (currentRecordIndex >= numRecords) {
return false;
}
- currentRecord = reader.read(null, decoder);
+ Object record = reader.read(null, decoder);
+ currentRecord =
+ (mode.parseFn == null) ? ((T) record) : mode.parseFn.apply((GenericRecord) record);
currentRecordIndex++;
return true;
}
@@ -585,8 +664,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
new AvroBlock<>(
data,
numRecords,
- getCurrentSource().type,
- getCurrentSource().readerSchemaString,
+ getCurrentSource().mode,
metadata.getSchemaString(),
metadata.getCodec());
http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 90cd824..154ff5a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -67,6 +67,7 @@ import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -114,9 +115,9 @@ public class AvroIOTest {
public GenericClass() {}
- public GenericClass(int intValue, String stringValue) {
- this.intField = intValue;
- this.stringField = stringValue;
+ public GenericClass(int intField, String stringField) {
+ this.intField = intField;
+ this.stringField = stringField;
}
@Override
@@ -142,9 +143,18 @@ public class AvroIOTest {
}
}
+ private static class ParseGenericClass
+ implements SerializableFunction<GenericRecord, GenericClass> {
+ @Override
+ public GenericClass apply(GenericRecord input) {
+ return new GenericClass(
+ (int) input.get("intField"), input.get("stringField").toString());
+ }
+ }
+
@Test
@Category(NeedsRunner.class)
- public void testAvroIOWriteAndReadASingleFile() throws Throwable {
+ public void testAvroIOWriteAndReadAndParseASingleFile() throws Throwable {
List<GenericClass> values =
ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
@@ -153,23 +163,45 @@ public class AvroIOTest {
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
writePipeline.run().waitUntilFinish();
- // Test the same data via read(), read().withHintMatchesManyFiles(), and readAll()
+ // Test the same data using all versions of read().
+ PCollection<String> path =
+ readPipeline.apply("Create path", Create.of(outputFile.getAbsolutePath()));
PAssert.that(
- readPipeline.apply(
- "Read", AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
+ readPipeline.apply(
+ "Read", AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
.containsInAnyOrder(values);
PAssert.that(
- readPipeline.apply(
- "Read withHintMatchesManyFiles",
- AvroIO.read(GenericClass.class)
- .from(outputFile.getAbsolutePath())
- .withHintMatchesManyFiles()))
+ readPipeline.apply(
+ "Read withHintMatchesManyFiles",
+ AvroIO.read(GenericClass.class)
+ .from(outputFile.getAbsolutePath())
+ .withHintMatchesManyFiles()))
.containsInAnyOrder(values);
PAssert.that(
- "ReadAll",
- readPipeline
- .apply(Create.of(outputFile.getAbsolutePath()))
- .apply(AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+ path.apply(
+ "ReadAll", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+ .containsInAnyOrder(values);
+ PAssert.that(
+ readPipeline.apply(
+ "Parse",
+ AvroIO.parseGenericRecords(new ParseGenericClass())
+ .from(outputFile.getAbsolutePath())
+ .withCoder(AvroCoder.of(GenericClass.class))))
+ .containsInAnyOrder(values);
+ PAssert.that(
+ readPipeline.apply(
+ "Parse withHintMatchesManyFiles",
+ AvroIO.parseGenericRecords(new ParseGenericClass())
+ .from(outputFile.getAbsolutePath())
+ .withCoder(AvroCoder.of(GenericClass.class))
+ .withHintMatchesManyFiles()))
+ .containsInAnyOrder(values);
+ PAssert.that(
+ path.apply(
+ "ParseAll",
+ AvroIO.parseAllGenericRecords(new ParseGenericClass())
+ .withCoder(AvroCoder.of(GenericClass.class))
+ .withDesiredBundleSizeBytes(10)))
.containsInAnyOrder(values);
readPipeline.run();
@@ -200,7 +232,7 @@ public class AvroIOTest {
.withNumShards(3));
writePipeline.run().waitUntilFinish();
- // Test both read() and readAll()
+ // Test read(), readAll(), and parseAllGenericRecords().
PAssert.that(
readPipeline.apply(
"Read first",
@@ -213,15 +245,22 @@ public class AvroIOTest {
AvroIO.read(GenericClass.class)
.from(tmpFolder.getRoot().getAbsolutePath() + "/second*")))
.containsInAnyOrder(secondValues);
+ PCollection<String> paths =
+ readPipeline.apply(
+ "Create paths",
+ Create.of(
+ tmpFolder.getRoot().getAbsolutePath() + "/first*",
+ tmpFolder.getRoot().getAbsolutePath() + "/second*"));
+ PAssert.that(
+ paths.apply(
+ "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+ .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
PAssert.that(
- readPipeline
- .apply(
- "Create paths",
- Create.of(
- tmpFolder.getRoot().getAbsolutePath() + "/first*",
- tmpFolder.getRoot().getAbsolutePath() + "/second*"))
- .apply(
- "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+ paths.apply(
+ "Parse all",
+ AvroIO.parseAllGenericRecords(new ParseGenericClass())
+ .withCoder(AvroCoder.of(GenericClass.class))
+ .withDesiredBundleSizeBytes(10)))
.containsInAnyOrder(Iterables.concat(firstValues, secondValues));
readPipeline.run();
http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index bf2ac95..714e029 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -59,6 +59,7 @@ import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.SerializableUtils;
import org.hamcrest.Matchers;
@@ -407,11 +408,6 @@ public class AvroSourceTest {
source = AvroSource.from(filename).withSchema(schemaString);
records = SourceTestUtils.readFromSource(source, null);
assertEqualsWithGeneric(expected, records);
-
- // Create a source with no schema
- source = AvroSource.from(filename);
- records = SourceTestUtils.readFromSource(source, null);
- assertEqualsWithGeneric(expected, records);
}
@Test
@@ -449,6 +445,30 @@ public class AvroSourceTest {
assertSame(sourceA.getReaderSchemaString(), sourceC.getReaderSchemaString());
}
+ @Test
+ public void testParseFn() throws Exception {
+ List<Bird> expected = createRandomRecords(100);
+ String filename = generateTestFile("tmp.avro", expected, SyncBehavior.SYNC_DEFAULT, 0,
+ AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
+
+ AvroSource<Bird> source =
+ AvroSource.from(filename)
+ .withParseFn(
+ new SerializableFunction<GenericRecord, Bird>() {
+ @Override
+ public Bird apply(GenericRecord input) {
+ return new Bird(
+ (long) input.get("number"),
+ input.get("species").toString(),
+ input.get("quality").toString(),
+ (long) input.get("quantity"));
+ }
+ },
+ AvroCoder.of(Bird.class));
+ List<Bird> actual = SourceTestUtils.readFromSource(source, null);
+ assertThat(actual, containsInAnyOrder(expected.toArray()));
+ }
+
private void assertEqualsWithGeneric(List<Bird> expected, List<GenericRecord> actual) {
assertEquals(expected.size(), actual.size());
for (int i = 0; i < expected.size(); i++) {
http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 2b1eafe..6c118a0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -183,8 +183,8 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
for (ResourceId file : files) {
- avroSources.add(new TransformingSource<>(
- AvroSource.from(file.toString()), function, getDefaultOutputCoder()));
+ avroSources.add(
+ AvroSource.from(file.toString()).withParseFn(function, getDefaultOutputCoder()));
}
return ImmutableList.copyOf(avroSources);
}