You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/21 00:06:03 UTC
[2/3] beam git commit: Minor changes to AvroSource in preparation for
refactoring
Minor changes to AvroSource in preparation for refactoring
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c52a908c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c52a908c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c52a908c
Branch: refs/heads/master
Commit: c52a908cba7765e120a94909ab02c548d1a124ad
Parents: 7e63d2c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 18 13:40:52 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 20 16:59:11 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroSource.java | 171 ++++++++-----------
.../org/apache/beam/sdk/io/AvroSourceTest.java | 26 +--
2 files changed, 70 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c52a908c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 575218b..0634774 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -37,6 +37,7 @@ import java.util.Map;
import java.util.WeakHashMap;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
@@ -127,15 +128,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
// The default sync interval is 64k.
private static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * DataFileConstants.DEFAULT_SYNC_INTERVAL;
- // The JSON schema used to encode records.
- private final String readSchemaString;
+ // The type of the records contained in the file.
+ private final Class<T> type;
+
+ // The JSON schema used to decode records.
+ @Nullable
+ private final String readerSchemaString;
// The JSON schema that was used to write the source Avro file (may differ from the schema we will
// use to read from it).
- private final String fileSchemaString;
-
- // The type of the records contained in the file.
- private final Class<T> type;
+ private final String writerSchemaString;
// The following metadata fields are not user-configurable. They are extracted from the object
// container file header upon subsource creation.
@@ -147,87 +149,75 @@ public class AvroSource<T> extends BlockBasedSource<T> {
// The object container file's 16-byte sync marker.
private final byte[] syncMarker;
- // Default output coder, lazily initialized.
- private transient AvroCoder<T> coder = null;
-
- // Schema of the file, lazily initialized.
- private transient Schema fileSchema;
-
- // Schema used to encode records, lazily initialized.
- private transient Schema readSchema;
-
/**
- * Creates an {@link AvroSource} that reads from the given file name or pattern ("glob"). The
- * returned source can be further configured by calling {@link #withSchema} to return a type other
- * than {@link GenericRecord}.
+ * Reads from the given file name or pattern ("glob"). The returned source can be further
+ * configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
*/
public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
return new AvroSource<>(
fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class, null, null);
}
- /**
- * Returns an {@link AvroSource} that's like this one but reads files containing records that
- * conform to the given schema.
- *
- * <p>Does not modify this object.
- */
+ /** Reads files containing records that conform to the given schema. */
public AvroSource<GenericRecord> withSchema(String schema) {
return new AvroSource<>(
getFileOrPatternSpec(), getMinBundleSize(), schema, GenericRecord.class, codec, syncMarker);
}
- /**
- * Returns an {@link AvroSource} that's like this one but reads files containing records that
- * conform to the given schema.
- *
- * <p>Does not modify this object.
- */
+ /** Like {@link #withSchema(String)}. */
public AvroSource<GenericRecord> withSchema(Schema schema) {
return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(), schema.toString(),
GenericRecord.class, codec, syncMarker);
}
- /**
- * Returns an {@link AvroSource} that's like this one but reads files containing records of the
- * type of the given class.
- *
- * <p>Does not modify this object.
- */
+ /** Reads files containing records of the given class. */
public <X> AvroSource<X> withSchema(Class<X> clazz) {
- return new AvroSource<X>(getFileOrPatternSpec(), getMinBundleSize(),
+ return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(),
ReflectData.get().getSchema(clazz).toString(), clazz, codec, syncMarker);
}
/**
- * Returns an {@link AvroSource} that's like this one but uses the supplied minimum bundle size.
- * Refer to {@link OffsetBasedSource} for a description of {@code minBundleSize} and its use.
- *
- * <p>Does not modify this object.
+ * Sets the minimum bundle size. Refer to {@link OffsetBasedSource} for a description of {@code
+ * minBundleSize} and its use.
*/
public AvroSource<T> withMinBundleSize(long minBundleSize) {
return new AvroSource<>(
- getFileOrPatternSpec(), minBundleSize, readSchemaString, type, codec, syncMarker);
+ getFileOrPatternSpec(), minBundleSize, readerSchemaString, type, codec, syncMarker);
}
- private AvroSource(String fileNameOrPattern, long minBundleSize, String schema, Class<T> type,
- String codec, byte[] syncMarker) {
+ /** Constructor for FILEPATTERN mode. */
+ private AvroSource(
+ String fileNameOrPattern,
+ long minBundleSize,
+ String readerSchemaString,
+ Class<T> type,
+ String codec,
+ byte[] syncMarker) {
super(fileNameOrPattern, minBundleSize);
- this.readSchemaString = internSchemaString(schema);
+ this.readerSchemaString = internSchemaString(readerSchemaString);
this.codec = codec;
this.syncMarker = syncMarker;
this.type = type;
- this.fileSchemaString = null;
+ this.writerSchemaString = null;
}
- private AvroSource(Metadata metadata, long minBundleSize, long startOffset, long endOffset,
- String schema, Class<T> type, String codec, byte[] syncMarker, String fileSchema) {
+ /** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */
+ private AvroSource(
+ Metadata metadata,
+ long minBundleSize,
+ long startOffset,
+ long endOffset,
+ String readerSchemaString,
+ Class<T> type,
+ String codec,
+ byte[] syncMarker,
+ String writerSchemaString) {
super(metadata, minBundleSize, startOffset, endOffset);
- this.readSchemaString = internSchemaString(schema);
+ this.readerSchemaString = internSchemaString(readerSchemaString);
this.codec = codec;
this.syncMarker = syncMarker;
this.type = type;
- this.fileSchemaString = internSchemaString(fileSchema);
+ this.writerSchemaString = internSchemaString(writerSchemaString);
}
@Override
@@ -241,13 +231,12 @@ public class AvroSource<T> extends BlockBasedSource<T> {
public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
byte[] syncMarker = this.syncMarker;
String codec = this.codec;
- String readSchemaString = this.readSchemaString;
- String fileSchemaString = this.fileSchemaString;
+ String writerSchemaString = this.writerSchemaString;
// codec and syncMarker are initially null when the source is created, as they differ
// across input files and must be read from the file. Here, when we are creating a source
// for a subrange of a file, we can initialize these values. When the resulting AvroSource
// is further split, they do not need to be read again.
- if (codec == null || syncMarker == null || fileSchemaString == null) {
+ if (codec == null || syncMarker == null || writerSchemaString == null) {
AvroMetadata metadata;
try {
metadata = readMetadataFromFile(fileMetadata.resourceId());
@@ -256,20 +245,23 @@ public class AvroSource<T> extends BlockBasedSource<T> {
}
codec = metadata.getCodec();
syncMarker = metadata.getSyncMarker();
- fileSchemaString = metadata.getSchemaString();
- // If the source was created with a null schema, use the schema that we read from the file's
- // metadata.
- if (readSchemaString == null) {
- readSchemaString = metadata.getSchemaString();
- }
+ writerSchemaString = metadata.getSchemaString();
}
- // Note that if the fileSchemaString is equivalent to the readSchemaString, "intern"ing
+ // Note that if the writerSchemaString is equivalent to the readerSchemaString, "intern"ing
// the string will occur within the constructor and return the same reference as the
- // readSchemaString. This allows for Java to have an efficient serialization since it
+ // readerSchemaString. This allows for Java to have an efficient serialization since it
// will only encode the schema once while just storing pointers to the encoded version
// within this source.
- return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, readSchemaString, type,
- codec, syncMarker, fileSchemaString);
+ return new AvroSource<>(
+ fileMetadata,
+ getMinBundleSize(),
+ start,
+ end,
+ readerSchemaString,
+ type,
+ codec,
+ syncMarker,
+ writerSchemaString);
}
@Override
@@ -279,40 +271,13 @@ public class AvroSource<T> extends BlockBasedSource<T> {
@Override
public AvroCoder<T> getDefaultOutputCoder() {
- if (coder == null) {
- coder = AvroCoder.of(type, internOrParseSchemaString(readSchemaString));
- }
- return coder;
- }
-
- public String getSchema() {
- return readSchemaString;
- }
-
- @VisibleForTesting
- Schema getReadSchema() {
- if (readSchemaString == null) {
- return null;
- }
-
- // If the schema has not been parsed, parse it.
- if (readSchema == null) {
- readSchema = internOrParseSchemaString(readSchemaString);
- }
- return readSchema;
+ return AvroCoder.of(type, internOrParseSchemaString(readerSchemaString));
}
@VisibleForTesting
- Schema getFileSchema() {
- if (fileSchemaString == null) {
- return null;
- }
-
- // If the schema has not been parsed, parse it.
- if (fileSchema == null) {
- fileSchema = internOrParseSchemaString(fileSchemaString);
- }
- return fileSchema;
+ @Nullable
+ String getReaderSchemaString() {
+ return readerSchemaString;
}
private byte[] getSyncMarker() {
@@ -427,14 +392,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
}
private DatumReader<T> createDatumReader() {
- Schema readSchema = getReadSchema();
- Schema fileSchema = getFileSchema();
- checkNotNull(readSchema, "No read schema has been initialized for source %s", this);
- checkNotNull(fileSchema, "No file schema has been initialized for source %s", this);
+ checkNotNull(writerSchemaString, "No writer schema has been initialized for source %s", this);
+ Schema writerSchema = internOrParseSchemaString(writerSchemaString);
+ Schema readerSchema =
+ (readerSchemaString == null) ? writerSchema : internOrParseSchemaString(readerSchemaString);
if (type == GenericRecord.class) {
- return new GenericDatumReader<>(fileSchema, readSchema);
+ return new GenericDatumReader<>(writerSchema, readerSchema);
} else {
- return new ReflectDatumReader<>(fileSchema, readSchema);
+ return new ReflectDatumReader<>(writerSchema, readerSchema);
}
}
@@ -477,16 +442,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
getMinBundleSize(),
getStartOffset(),
getEndOffset(),
- readSchemaString,
+ readerSchemaString,
type,
codec,
syncMarker,
- fileSchemaString);
+ writerSchemaString);
case FILEPATTERN:
return new AvroSource<>(
getFileOrPatternSpec(),
getMinBundleSize(),
- readSchemaString,
+ readerSchemaString,
type,
codec,
syncMarker);
http://git-wip-us.apache.org/repos/asf/beam/blob/c52a908c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index d6facba..0fc2b3e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -445,33 +445,11 @@ public class AvroSourceTest {
AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema(schemaA);
AvroSource<GenericRecord> sourceB = AvroSource.from(filename).withSchema(schemaB);
- assertSame(sourceA.getSchema(), sourceB.getSchema());
+ assertSame(sourceA.getReaderSchemaString(), sourceB.getReaderSchemaString());
// Ensure that deserialization still goes through interning
AvroSource<GenericRecord> sourceC = SerializableUtils.clone(sourceB);
- assertSame(sourceA.getSchema(), sourceC.getSchema());
- }
-
- @Test
- public void testSchemaIsInterned() throws Exception {
- List<Bird> birds = createRandomRecords(100);
- String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0,
- AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
- Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename);
- String schemaA = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
- String schemaB = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
- assertNotSame(schemaA, schemaB);
-
- AvroSource<GenericRecord> sourceA = (AvroSource<GenericRecord>) AvroSource.from(filename)
- .withSchema(schemaA).createForSubrangeOfFile(fileMetadata, 0L, 0L);
- AvroSource<GenericRecord> sourceB = (AvroSource<GenericRecord>) AvroSource.from(filename)
- .withSchema(schemaB).createForSubrangeOfFile(fileMetadata, 0L, 0L);
- assertSame(sourceA.getReadSchema(), sourceA.getFileSchema());
- assertSame(sourceA.getReadSchema(), sourceB.getReadSchema());
- assertSame(sourceA.getReadSchema(), sourceB.getFileSchema());
-
- // Schemas are transient and not serialized thus we don't need to worry about interning
- // after deserialization.
+ assertSame(sourceA.getReaderSchemaString(), sourceC.getReaderSchemaString());
}
private void assertEqualsWithGeneric(List<Bird> expected, List<GenericRecord> actual) {