You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/21 00:06:02 UTC
[1/3] beam git commit: Gets rid of opening Avro files in
createForSubrangeOfFile codepath
Repository: beam
Updated Branches:
refs/heads/master 7e63d2cf6 -> 1d9160fa3
Gets rid of opening Avro files in createForSubrangeOfFile codepath
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4026da1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4026da1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4026da1
Branch: refs/heads/master
Commit: d4026da1ad1fa0864052b85a66c4af5975327e9f
Parents: c52a908
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 18 14:09:03 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 20 16:59:11 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroSource.java | 176 +++++++------------
.../org/apache/beam/sdk/io/AvroSourceTest.java | 11 +-
2 files changed, 63 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d4026da1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 0634774..30af344 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
@@ -135,45 +136,33 @@ public class AvroSource<T> extends BlockBasedSource<T> {
@Nullable
private final String readerSchemaString;
- // The JSON schema that was used to write the source Avro file (may differ from the schema we will
- // use to read from it).
- private final String writerSchemaString;
-
- // The following metadata fields are not user-configurable. They are extracted from the object
- // container file header upon subsource creation.
-
- // The codec used to encode the blocks in the Avro file. String value drawn from those in
- // https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
- private final String codec;
-
- // The object container file's 16-byte sync marker.
- private final byte[] syncMarker;
-
/**
* Reads from the given file name or pattern ("glob"). The returned source can be further
* configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
*/
public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
- return new AvroSource<>(
- fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class, null, null);
+ return new AvroSource<>(fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class);
}
/** Reads files containing records that conform to the given schema. */
public AvroSource<GenericRecord> withSchema(String schema) {
return new AvroSource<>(
- getFileOrPatternSpec(), getMinBundleSize(), schema, GenericRecord.class, codec, syncMarker);
+ getFileOrPatternSpec(), getMinBundleSize(), schema, GenericRecord.class);
}
/** Like {@link #withSchema(String)}. */
public AvroSource<GenericRecord> withSchema(Schema schema) {
- return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(), schema.toString(),
- GenericRecord.class, codec, syncMarker);
+ return new AvroSource<>(
+ getFileOrPatternSpec(), getMinBundleSize(), schema.toString(), GenericRecord.class);
}
/** Reads files containing records of the given class. */
public <X> AvroSource<X> withSchema(Class<X> clazz) {
- return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(),
- ReflectData.get().getSchema(clazz).toString(), clazz, codec, syncMarker);
+ return new AvroSource<>(
+ getFileOrPatternSpec(),
+ getMinBundleSize(),
+ ReflectData.get().getSchema(clazz).toString(),
+ clazz);
}
/**
@@ -181,24 +170,15 @@ public class AvroSource<T> extends BlockBasedSource<T> {
* minBundleSize} and its use.
*/
public AvroSource<T> withMinBundleSize(long minBundleSize) {
- return new AvroSource<>(
- getFileOrPatternSpec(), minBundleSize, readerSchemaString, type, codec, syncMarker);
+ return new AvroSource<>(getFileOrPatternSpec(), minBundleSize, readerSchemaString, type);
}
/** Constructor for FILEPATTERN mode. */
private AvroSource(
- String fileNameOrPattern,
- long minBundleSize,
- String readerSchemaString,
- Class<T> type,
- String codec,
- byte[] syncMarker) {
+ String fileNameOrPattern, long minBundleSize, String readerSchemaString, Class<T> type) {
super(fileNameOrPattern, minBundleSize);
this.readerSchemaString = internSchemaString(readerSchemaString);
- this.codec = codec;
- this.syncMarker = syncMarker;
this.type = type;
- this.writerSchemaString = null;
}
/** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */
@@ -208,16 +188,10 @@ public class AvroSource<T> extends BlockBasedSource<T> {
long startOffset,
long endOffset,
String readerSchemaString,
- Class<T> type,
- String codec,
- byte[] syncMarker,
- String writerSchemaString) {
+ Class<T> type) {
super(metadata, minBundleSize, startOffset, endOffset);
this.readerSchemaString = internSchemaString(readerSchemaString);
- this.codec = codec;
- this.syncMarker = syncMarker;
this.type = type;
- this.writerSchemaString = internSchemaString(writerSchemaString);
}
@Override
@@ -229,39 +203,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
@Override
public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
- byte[] syncMarker = this.syncMarker;
- String codec = this.codec;
- String writerSchemaString = this.writerSchemaString;
- // codec and syncMarker are initially null when the source is created, as they differ
- // across input files and must be read from the file. Here, when we are creating a source
- // for a subrange of a file, we can initialize these values. When the resulting AvroSource
- // is further split, they do not need to be read again.
- if (codec == null || syncMarker == null || writerSchemaString == null) {
- AvroMetadata metadata;
- try {
- metadata = readMetadataFromFile(fileMetadata.resourceId());
- } catch (IOException e) {
- throw new RuntimeException("Error reading metadata from file " + fileMetadata, e);
- }
- codec = metadata.getCodec();
- syncMarker = metadata.getSyncMarker();
- writerSchemaString = metadata.getSchemaString();
- }
- // Note that if the writerSchemaString is equivalent to the readerSchemaString, "intern"ing
- // the string will occur within the constructor and return the same reference as the
- // readerSchemaString. This allows for Java to have an efficient serialization since it
- // will only encode the schema once while just storing pointers to the encoded version
- // within this source.
- return new AvroSource<>(
- fileMetadata,
- getMinBundleSize(),
- start,
- end,
- readerSchemaString,
- type,
- codec,
- syncMarker,
- writerSchemaString);
+ return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, readerSchemaString, type);
}
@Override
@@ -280,27 +222,17 @@ public class AvroSource<T> extends BlockBasedSource<T> {
return readerSchemaString;
}
- private byte[] getSyncMarker() {
- return syncMarker;
- }
-
- private String getCodec() {
- return codec;
- }
-
- /**
- * Avro file metadata.
- */
+ /** Avro file metadata. */
@VisibleForTesting
static class AvroMetadata {
- private byte[] syncMarker;
- private String codec;
- private String schemaString;
+ private final byte[] syncMarker;
+ private final String codec;
+ private final String schemaString;
AvroMetadata(byte[] syncMarker, String codec, String schemaString) {
this.syncMarker = checkNotNull(syncMarker, "syncMarker");
this.codec = checkNotNull(codec, "codec");
- this.schemaString = checkNotNull(schemaString, "schemaString");
+ this.schemaString = internSchemaString(checkNotNull(schemaString, "schemaString"));
}
/**
@@ -391,18 +323,6 @@ public class AvroSource<T> extends BlockBasedSource<T> {
return new AvroMetadata(syncMarker, codec, schemaString);
}
- private DatumReader<T> createDatumReader() {
- checkNotNull(writerSchemaString, "No writer schema has been initialized for source %s", this);
- Schema writerSchema = internOrParseSchemaString(writerSchemaString);
- Schema readerSchema =
- (readerSchemaString == null) ? writerSchema : internOrParseSchemaString(readerSchemaString);
- if (type == GenericRecord.class) {
- return new GenericDatumReader<>(writerSchema, readerSchema);
- } else {
- return new ReflectDatumReader<>(writerSchema, readerSchema);
- }
- }
-
// A logical reference cache used to store schemas and schema strings to allow us to
// "intern" values and reduce the number of copies of equivalent objects.
private static final Map<String, Schema> schemaLogicalReferenceCache = new WeakHashMap<>();
@@ -443,18 +363,10 @@ public class AvroSource<T> extends BlockBasedSource<T> {
getStartOffset(),
getEndOffset(),
readerSchemaString,
- type,
- codec,
- syncMarker,
- writerSchemaString);
+ type);
case FILEPATTERN:
return new AvroSource<>(
- getFileOrPatternSpec(),
- getMinBundleSize(),
- readerSchemaString,
- type,
- codec,
- syncMarker);
+ getFileOrPatternSpec(), getMinBundleSize(), readerSchemaString, type);
default:
throw new InvalidObjectException(
String.format("Unknown mode %s for AvroSource %s", getMode(), this));
@@ -518,11 +430,25 @@ public class AvroSource<T> extends BlockBasedSource<T> {
}
}
- AvroBlock(byte[] data, long numRecords, AvroSource<T> source) throws IOException {
+ AvroBlock(
+ byte[] data,
+ long numRecords,
+ Class<? extends T> type,
+ String readerSchemaString,
+ String writerSchemaString,
+ String codec)
+ throws IOException {
this.numRecords = numRecords;
- this.reader = source.createDatumReader();
- this.decoder =
- DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, source.getCodec()), null);
+ checkNotNull(writerSchemaString, "writerSchemaString");
+ Schema writerSchema = internOrParseSchemaString(writerSchemaString);
+ Schema readerSchema =
+ internOrParseSchemaString(
+ MoreObjects.firstNonNull(readerSchemaString, writerSchemaString));
+ this.reader =
+ (type == GenericRecord.class)
+ ? new GenericDatumReader<T>(writerSchema, readerSchema)
+ : new ReflectDatumReader<T>(writerSchema, readerSchema);
+ this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null);
}
@Override
@@ -558,6 +484,8 @@ public class AvroSource<T> extends BlockBasedSource<T> {
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public static class AvroReader<T> extends BlockBasedReader<T> {
+ private AvroMetadata metadata;
+
// The current block.
private AvroBlock<T> currentBlock;
@@ -631,10 +559,17 @@ public class AvroSource<T> extends BlockBasedSource<T> {
"Only able to read %s/%s bytes in the block before EOF reached.",
bytesRead,
blockSize);
- currentBlock = new AvroBlock<>(data, numRecords, getCurrentSource());
+ currentBlock =
+ new AvroBlock<>(
+ data,
+ numRecords,
+ getCurrentSource().type,
+ getCurrentSource().readerSchemaString,
+ metadata.getSchemaString(),
+ metadata.getCodec());
// Read the end of this block, which MUST be a sync marker for correctness.
- byte[] syncMarker = getCurrentSource().getSyncMarker();
+ byte[] syncMarker = metadata.getSyncMarker();
byte[] readSyncMarker = new byte[syncMarker.length];
long syncMarkerOffset = startOfNextBlock + headerSize + blockSize;
bytesRead = IOUtils.readFully(stream, readSyncMarker);
@@ -705,7 +640,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
private PushbackInputStream createStream(ReadableByteChannel channel) {
return new PushbackInputStream(
Channels.newInputStream(channel),
- getCurrentSource().getSyncMarker().length);
+ metadata.getSyncMarker().length);
}
// Postcondition: the stream is positioned at the beginning of the first block after the start
@@ -713,8 +648,15 @@ public class AvroSource<T> extends BlockBasedSource<T> {
// currentBlockSizeBytes will be set to 0 indicating that the previous block was empty.
@Override
protected void startReading(ReadableByteChannel channel) throws IOException {
+ try {
+ metadata = readMetadataFromFile(getCurrentSource().getSingleFileMetadata().resourceId());
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Error reading metadata from file " + getCurrentSource().getSingleFileMetadata(), e);
+ }
+
long startOffset = getCurrentSource().getStartOffset();
- byte[] syncMarker = getCurrentSource().getSyncMarker();
+ byte[] syncMarker = metadata.getSyncMarker();
long syncMarkerLength = syncMarker.length;
if (startOffset != 0) {
http://git-wip-us.apache.org/repos/asf/beam/blob/d4026da1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index 0fc2b3e..bf2ac95 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -21,7 +21,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -439,12 +438,10 @@ public class AvroSourceTest {
String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0,
AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename);
- String schemaA = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
- String schemaB = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
- assertNotSame(schemaA, schemaB);
-
- AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema(schemaA);
- AvroSource<GenericRecord> sourceB = AvroSource.from(filename).withSchema(schemaB);
+ String schema = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
+ // Add "" to the schema to make sure it is not interned.
+ AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema("" + schema);
+ AvroSource<GenericRecord> sourceB = AvroSource.from(filename).withSchema("" + schema);
assertSame(sourceA.getReaderSchemaString(), sourceB.getReaderSchemaString());
// Ensure that deserialization still goes through interning
[3/3] beam git commit: This closes #3590: [BEAM-2628] Makes
AvroSource not open files while splitting
Posted by jk...@apache.org.
This closes #3590: [BEAM-2628] Makes AvroSource not open files while splitting
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1d9160fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1d9160fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1d9160fa
Branch: refs/heads/master
Commit: 1d9160fa3337704b9fbdc423796925be78b0087e
Parents: 7e63d2c d4026da
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Jul 20 16:59:27 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 20 16:59:27 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroSource.java | 265 ++++++-------------
.../org/apache/beam/sdk/io/AvroSourceTest.java | 37 +--
2 files changed, 92 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Minor changes to AvroSource in preparation for
refactoring
Posted by jk...@apache.org.
Minor changes to AvroSource in preparation for refactoring
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c52a908c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c52a908c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c52a908c
Branch: refs/heads/master
Commit: c52a908cba7765e120a94909ab02c548d1a124ad
Parents: 7e63d2c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 18 13:40:52 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 20 16:59:11 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroSource.java | 171 ++++++++-----------
.../org/apache/beam/sdk/io/AvroSourceTest.java | 26 +--
2 files changed, 70 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c52a908c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 575218b..0634774 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -37,6 +37,7 @@ import java.util.Map;
import java.util.WeakHashMap;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
@@ -127,15 +128,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
// The default sync interval is 64k.
private static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * DataFileConstants.DEFAULT_SYNC_INTERVAL;
- // The JSON schema used to encode records.
- private final String readSchemaString;
+ // The type of the records contained in the file.
+ private final Class<T> type;
+
+ // The JSON schema used to decode records.
+ @Nullable
+ private final String readerSchemaString;
// The JSON schema that was used to write the source Avro file (may differ from the schema we will
// use to read from it).
- private final String fileSchemaString;
-
- // The type of the records contained in the file.
- private final Class<T> type;
+ private final String writerSchemaString;
// The following metadata fields are not user-configurable. They are extracted from the object
// container file header upon subsource creation.
@@ -147,87 +149,75 @@ public class AvroSource<T> extends BlockBasedSource<T> {
// The object container file's 16-byte sync marker.
private final byte[] syncMarker;
- // Default output coder, lazily initialized.
- private transient AvroCoder<T> coder = null;
-
- // Schema of the file, lazily initialized.
- private transient Schema fileSchema;
-
- // Schema used to encode records, lazily initialized.
- private transient Schema readSchema;
-
/**
- * Creates an {@link AvroSource} that reads from the given file name or pattern ("glob"). The
- * returned source can be further configured by calling {@link #withSchema} to return a type other
- * than {@link GenericRecord}.
+ * Reads from the given file name or pattern ("glob"). The returned source can be further
+ * configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
*/
public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
return new AvroSource<>(
fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class, null, null);
}
- /**
- * Returns an {@link AvroSource} that's like this one but reads files containing records that
- * conform to the given schema.
- *
- * <p>Does not modify this object.
- */
+ /** Reads files containing records that conform to the given schema. */
public AvroSource<GenericRecord> withSchema(String schema) {
return new AvroSource<>(
getFileOrPatternSpec(), getMinBundleSize(), schema, GenericRecord.class, codec, syncMarker);
}
- /**
- * Returns an {@link AvroSource} that's like this one but reads files containing records that
- * conform to the given schema.
- *
- * <p>Does not modify this object.
- */
+ /** Like {@link #withSchema(String)}. */
public AvroSource<GenericRecord> withSchema(Schema schema) {
return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(), schema.toString(),
GenericRecord.class, codec, syncMarker);
}
- /**
- * Returns an {@link AvroSource} that's like this one but reads files containing records of the
- * type of the given class.
- *
- * <p>Does not modify this object.
- */
+ /** Reads files containing records of the given class. */
public <X> AvroSource<X> withSchema(Class<X> clazz) {
- return new AvroSource<X>(getFileOrPatternSpec(), getMinBundleSize(),
+ return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(),
ReflectData.get().getSchema(clazz).toString(), clazz, codec, syncMarker);
}
/**
- * Returns an {@link AvroSource} that's like this one but uses the supplied minimum bundle size.
- * Refer to {@link OffsetBasedSource} for a description of {@code minBundleSize} and its use.
- *
- * <p>Does not modify this object.
+ * Sets the minimum bundle size. Refer to {@link OffsetBasedSource} for a description of {@code
+ * minBundleSize} and its use.
*/
public AvroSource<T> withMinBundleSize(long minBundleSize) {
return new AvroSource<>(
- getFileOrPatternSpec(), minBundleSize, readSchemaString, type, codec, syncMarker);
+ getFileOrPatternSpec(), minBundleSize, readerSchemaString, type, codec, syncMarker);
}
- private AvroSource(String fileNameOrPattern, long minBundleSize, String schema, Class<T> type,
- String codec, byte[] syncMarker) {
+ /** Constructor for FILEPATTERN mode. */
+ private AvroSource(
+ String fileNameOrPattern,
+ long minBundleSize,
+ String readerSchemaString,
+ Class<T> type,
+ String codec,
+ byte[] syncMarker) {
super(fileNameOrPattern, minBundleSize);
- this.readSchemaString = internSchemaString(schema);
+ this.readerSchemaString = internSchemaString(readerSchemaString);
this.codec = codec;
this.syncMarker = syncMarker;
this.type = type;
- this.fileSchemaString = null;
+ this.writerSchemaString = null;
}
- private AvroSource(Metadata metadata, long minBundleSize, long startOffset, long endOffset,
- String schema, Class<T> type, String codec, byte[] syncMarker, String fileSchema) {
+ /** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */
+ private AvroSource(
+ Metadata metadata,
+ long minBundleSize,
+ long startOffset,
+ long endOffset,
+ String readerSchemaString,
+ Class<T> type,
+ String codec,
+ byte[] syncMarker,
+ String writerSchemaString) {
super(metadata, minBundleSize, startOffset, endOffset);
- this.readSchemaString = internSchemaString(schema);
+ this.readerSchemaString = internSchemaString(readerSchemaString);
this.codec = codec;
this.syncMarker = syncMarker;
this.type = type;
- this.fileSchemaString = internSchemaString(fileSchema);
+ this.writerSchemaString = internSchemaString(writerSchemaString);
}
@Override
@@ -241,13 +231,12 @@ public class AvroSource<T> extends BlockBasedSource<T> {
public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
byte[] syncMarker = this.syncMarker;
String codec = this.codec;
- String readSchemaString = this.readSchemaString;
- String fileSchemaString = this.fileSchemaString;
+ String writerSchemaString = this.writerSchemaString;
// codec and syncMarker are initially null when the source is created, as they differ
// across input files and must be read from the file. Here, when we are creating a source
// for a subrange of a file, we can initialize these values. When the resulting AvroSource
// is further split, they do not need to be read again.
- if (codec == null || syncMarker == null || fileSchemaString == null) {
+ if (codec == null || syncMarker == null || writerSchemaString == null) {
AvroMetadata metadata;
try {
metadata = readMetadataFromFile(fileMetadata.resourceId());
@@ -256,20 +245,23 @@ public class AvroSource<T> extends BlockBasedSource<T> {
}
codec = metadata.getCodec();
syncMarker = metadata.getSyncMarker();
- fileSchemaString = metadata.getSchemaString();
- // If the source was created with a null schema, use the schema that we read from the file's
- // metadata.
- if (readSchemaString == null) {
- readSchemaString = metadata.getSchemaString();
- }
+ writerSchemaString = metadata.getSchemaString();
}
- // Note that if the fileSchemaString is equivalent to the readSchemaString, "intern"ing
+ // Note that if the writerSchemaString is equivalent to the readerSchemaString, "intern"ing
// the string will occur within the constructor and return the same reference as the
- // readSchemaString. This allows for Java to have an efficient serialization since it
+ // readerSchemaString. This allows for Java to have an efficient serialization since it
// will only encode the schema once while just storing pointers to the encoded version
// within this source.
- return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, readSchemaString, type,
- codec, syncMarker, fileSchemaString);
+ return new AvroSource<>(
+ fileMetadata,
+ getMinBundleSize(),
+ start,
+ end,
+ readerSchemaString,
+ type,
+ codec,
+ syncMarker,
+ writerSchemaString);
}
@Override
@@ -279,40 +271,13 @@ public class AvroSource<T> extends BlockBasedSource<T> {
@Override
public AvroCoder<T> getDefaultOutputCoder() {
- if (coder == null) {
- coder = AvroCoder.of(type, internOrParseSchemaString(readSchemaString));
- }
- return coder;
- }
-
- public String getSchema() {
- return readSchemaString;
- }
-
- @VisibleForTesting
- Schema getReadSchema() {
- if (readSchemaString == null) {
- return null;
- }
-
- // If the schema has not been parsed, parse it.
- if (readSchema == null) {
- readSchema = internOrParseSchemaString(readSchemaString);
- }
- return readSchema;
+ return AvroCoder.of(type, internOrParseSchemaString(readerSchemaString));
}
@VisibleForTesting
- Schema getFileSchema() {
- if (fileSchemaString == null) {
- return null;
- }
-
- // If the schema has not been parsed, parse it.
- if (fileSchema == null) {
- fileSchema = internOrParseSchemaString(fileSchemaString);
- }
- return fileSchema;
+ @Nullable
+ String getReaderSchemaString() {
+ return readerSchemaString;
}
private byte[] getSyncMarker() {
@@ -427,14 +392,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
}
private DatumReader<T> createDatumReader() {
- Schema readSchema = getReadSchema();
- Schema fileSchema = getFileSchema();
- checkNotNull(readSchema, "No read schema has been initialized for source %s", this);
- checkNotNull(fileSchema, "No file schema has been initialized for source %s", this);
+ checkNotNull(writerSchemaString, "No writer schema has been initialized for source %s", this);
+ Schema writerSchema = internOrParseSchemaString(writerSchemaString);
+ Schema readerSchema =
+ (readerSchemaString == null) ? writerSchema : internOrParseSchemaString(readerSchemaString);
if (type == GenericRecord.class) {
- return new GenericDatumReader<>(fileSchema, readSchema);
+ return new GenericDatumReader<>(writerSchema, readerSchema);
} else {
- return new ReflectDatumReader<>(fileSchema, readSchema);
+ return new ReflectDatumReader<>(writerSchema, readerSchema);
}
}
@@ -477,16 +442,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
getMinBundleSize(),
getStartOffset(),
getEndOffset(),
- readSchemaString,
+ readerSchemaString,
type,
codec,
syncMarker,
- fileSchemaString);
+ writerSchemaString);
case FILEPATTERN:
return new AvroSource<>(
getFileOrPatternSpec(),
getMinBundleSize(),
- readSchemaString,
+ readerSchemaString,
type,
codec,
syncMarker);
http://git-wip-us.apache.org/repos/asf/beam/blob/c52a908c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index d6facba..0fc2b3e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -445,33 +445,11 @@ public class AvroSourceTest {
AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema(schemaA);
AvroSource<GenericRecord> sourceB = AvroSource.from(filename).withSchema(schemaB);
- assertSame(sourceA.getSchema(), sourceB.getSchema());
+ assertSame(sourceA.getReaderSchemaString(), sourceB.getReaderSchemaString());
// Ensure that deserialization still goes through interning
AvroSource<GenericRecord> sourceC = SerializableUtils.clone(sourceB);
- assertSame(sourceA.getSchema(), sourceC.getSchema());
- }
-
- @Test
- public void testSchemaIsInterned() throws Exception {
- List<Bird> birds = createRandomRecords(100);
- String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0,
- AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
- Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename);
- String schemaA = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
- String schemaB = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
- assertNotSame(schemaA, schemaB);
-
- AvroSource<GenericRecord> sourceA = (AvroSource<GenericRecord>) AvroSource.from(filename)
- .withSchema(schemaA).createForSubrangeOfFile(fileMetadata, 0L, 0L);
- AvroSource<GenericRecord> sourceB = (AvroSource<GenericRecord>) AvroSource.from(filename)
- .withSchema(schemaB).createForSubrangeOfFile(fileMetadata, 0L, 0L);
- assertSame(sourceA.getReadSchema(), sourceA.getFileSchema());
- assertSame(sourceA.getReadSchema(), sourceB.getReadSchema());
- assertSame(sourceA.getReadSchema(), sourceB.getFileSchema());
-
- // Schemas are transient and not serialized thus we don't need to worry about interning
- // after deserialization.
+ assertSame(sourceA.getReaderSchemaString(), sourceC.getReaderSchemaString());
}
private void assertEqualsWithGeneric(List<Bird> expected, List<GenericRecord> actual) {