You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/21 00:06:02 UTC

[1/3] beam git commit: Gets rid of opening Avro files in createForSubrangeOfFile codepath

Repository: beam
Updated Branches:
  refs/heads/master 7e63d2cf6 -> 1d9160fa3


Gets rid of opening Avro files in createForSubrangeOfFile codepath


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4026da1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4026da1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4026da1

Branch: refs/heads/master
Commit: d4026da1ad1fa0864052b85a66c4af5975327e9f
Parents: c52a908
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 18 14:09:03 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 20 16:59:11 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroSource.java | 176 +++++++------------
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  11 +-
 2 files changed, 63 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d4026da1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 0634774..30af344 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 import java.io.ByteArrayInputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -135,45 +136,33 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   @Nullable
   private final String readerSchemaString;
 
-  // The JSON schema that was used to write the source Avro file (may differ from the schema we will
-  // use to read from it).
-  private final String writerSchemaString;
-
-  // The following metadata fields are not user-configurable. They are extracted from the object
-  // container file header upon subsource creation.
-
-  // The codec used to encode the blocks in the Avro file. String value drawn from those in
-  // https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
-  private final String codec;
-
-  // The object container file's 16-byte sync marker.
-  private final byte[] syncMarker;
-
   /**
    * Reads from the given file name or pattern ("glob"). The returned source can be further
    * configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
    */
   public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
-    return new AvroSource<>(
-        fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class, null, null);
+    return new AvroSource<>(fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class);
   }
 
   /** Reads files containing records that conform to the given schema. */
   public AvroSource<GenericRecord> withSchema(String schema) {
     return new AvroSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), schema, GenericRecord.class, codec, syncMarker);
+        getFileOrPatternSpec(), getMinBundleSize(), schema, GenericRecord.class);
   }
 
   /** Like {@link #withSchema(String)}. */
   public AvroSource<GenericRecord> withSchema(Schema schema) {
-    return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(), schema.toString(),
-        GenericRecord.class, codec, syncMarker);
+    return new AvroSource<>(
+        getFileOrPatternSpec(), getMinBundleSize(), schema.toString(), GenericRecord.class);
   }
 
   /** Reads files containing records of the given class. */
   public <X> AvroSource<X> withSchema(Class<X> clazz) {
-    return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(),
-        ReflectData.get().getSchema(clazz).toString(), clazz, codec, syncMarker);
+    return new AvroSource<>(
+        getFileOrPatternSpec(),
+        getMinBundleSize(),
+        ReflectData.get().getSchema(clazz).toString(),
+        clazz);
   }
 
   /**
@@ -181,24 +170,15 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    * minBundleSize} and its use.
    */
   public AvroSource<T> withMinBundleSize(long minBundleSize) {
-    return new AvroSource<>(
-        getFileOrPatternSpec(), minBundleSize, readerSchemaString, type, codec, syncMarker);
+    return new AvroSource<>(getFileOrPatternSpec(), minBundleSize, readerSchemaString, type);
   }
 
   /** Constructor for FILEPATTERN mode. */
   private AvroSource(
-      String fileNameOrPattern,
-      long minBundleSize,
-      String readerSchemaString,
-      Class<T> type,
-      String codec,
-      byte[] syncMarker) {
+      String fileNameOrPattern, long minBundleSize, String readerSchemaString, Class<T> type) {
     super(fileNameOrPattern, minBundleSize);
     this.readerSchemaString = internSchemaString(readerSchemaString);
-    this.codec = codec;
-    this.syncMarker = syncMarker;
     this.type = type;
-    this.writerSchemaString = null;
   }
 
   /** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */
@@ -208,16 +188,10 @@ public class AvroSource<T> extends BlockBasedSource<T> {
       long startOffset,
       long endOffset,
       String readerSchemaString,
-      Class<T> type,
-      String codec,
-      byte[] syncMarker,
-      String writerSchemaString) {
+      Class<T> type) {
     super(metadata, minBundleSize, startOffset, endOffset);
     this.readerSchemaString = internSchemaString(readerSchemaString);
-    this.codec = codec;
-    this.syncMarker = syncMarker;
     this.type = type;
-    this.writerSchemaString = internSchemaString(writerSchemaString);
   }
 
   @Override
@@ -229,39 +203,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
 
   @Override
   public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
-    byte[] syncMarker = this.syncMarker;
-    String codec = this.codec;
-    String writerSchemaString = this.writerSchemaString;
-    // codec and syncMarker are initially null when the source is created, as they differ
-    // across input files and must be read from the file. Here, when we are creating a source
-    // for a subrange of a file, we can initialize these values. When the resulting AvroSource
-    // is further split, they do not need to be read again.
-    if (codec == null || syncMarker == null || writerSchemaString == null) {
-      AvroMetadata metadata;
-      try {
-        metadata = readMetadataFromFile(fileMetadata.resourceId());
-      } catch (IOException e) {
-        throw new RuntimeException("Error reading metadata from file " + fileMetadata, e);
-      }
-      codec = metadata.getCodec();
-      syncMarker = metadata.getSyncMarker();
-      writerSchemaString = metadata.getSchemaString();
-    }
-    // Note that if the writerSchemaString is equivalent to the readerSchemaString, "intern"ing
-    // the string will occur within the constructor and return the same reference as the
-    // readerSchemaString. This allows for Java to have an efficient serialization since it
-    // will only encode the schema once while just storing pointers to the encoded version
-    // within this source.
-    return new AvroSource<>(
-        fileMetadata,
-        getMinBundleSize(),
-        start,
-        end,
-        readerSchemaString,
-        type,
-        codec,
-        syncMarker,
-        writerSchemaString);
+    return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, readerSchemaString, type);
   }
 
   @Override
@@ -280,27 +222,17 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     return readerSchemaString;
   }
 
-  private byte[] getSyncMarker() {
-    return syncMarker;
-  }
-
-  private String getCodec() {
-    return codec;
-  }
-
-  /**
-   * Avro file metadata.
-   */
+  /** Avro file metadata. */
   @VisibleForTesting
   static class AvroMetadata {
-    private byte[] syncMarker;
-    private String codec;
-    private String schemaString;
+    private final byte[] syncMarker;
+    private final String codec;
+    private final String schemaString;
 
     AvroMetadata(byte[] syncMarker, String codec, String schemaString) {
       this.syncMarker = checkNotNull(syncMarker, "syncMarker");
       this.codec = checkNotNull(codec, "codec");
-      this.schemaString = checkNotNull(schemaString, "schemaString");
+      this.schemaString = internSchemaString(checkNotNull(schemaString, "schemaString"));
     }
 
     /**
@@ -391,18 +323,6 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     return new AvroMetadata(syncMarker, codec, schemaString);
   }
 
-  private DatumReader<T> createDatumReader() {
-    checkNotNull(writerSchemaString, "No writer schema has been initialized for source %s", this);
-    Schema writerSchema = internOrParseSchemaString(writerSchemaString);
-    Schema readerSchema =
-        (readerSchemaString == null) ? writerSchema : internOrParseSchemaString(readerSchemaString);
-    if (type == GenericRecord.class) {
-      return new GenericDatumReader<>(writerSchema, readerSchema);
-    } else {
-      return new ReflectDatumReader<>(writerSchema, readerSchema);
-    }
-  }
-
   // A logical reference cache used to store schemas and schema strings to allow us to
   // "intern" values and reduce the number of copies of equivalent objects.
   private static final Map<String, Schema> schemaLogicalReferenceCache = new WeakHashMap<>();
@@ -443,18 +363,10 @@ public class AvroSource<T> extends BlockBasedSource<T> {
             getStartOffset(),
             getEndOffset(),
             readerSchemaString,
-            type,
-            codec,
-            syncMarker,
-            writerSchemaString);
+            type);
       case FILEPATTERN:
         return new AvroSource<>(
-            getFileOrPatternSpec(),
-            getMinBundleSize(),
-            readerSchemaString,
-            type,
-            codec,
-            syncMarker);
+            getFileOrPatternSpec(), getMinBundleSize(), readerSchemaString, type);
         default:
           throw new InvalidObjectException(
               String.format("Unknown mode %s for AvroSource %s", getMode(), this));
@@ -518,11 +430,25 @@ public class AvroSource<T> extends BlockBasedSource<T> {
       }
     }
 
-    AvroBlock(byte[] data, long numRecords, AvroSource<T> source) throws IOException {
+    AvroBlock(
+        byte[] data,
+        long numRecords,
+        Class<? extends T> type,
+        String readerSchemaString,
+        String writerSchemaString,
+        String codec)
+        throws IOException {
       this.numRecords = numRecords;
-      this.reader = source.createDatumReader();
-      this.decoder =
-          DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, source.getCodec()), null);
+      checkNotNull(writerSchemaString, "writerSchemaString");
+      Schema writerSchema = internOrParseSchemaString(writerSchemaString);
+      Schema readerSchema =
+          internOrParseSchemaString(
+              MoreObjects.firstNonNull(readerSchemaString, writerSchemaString));
+      this.reader =
+          (type == GenericRecord.class)
+              ? new GenericDatumReader<T>(writerSchema, readerSchema)
+              : new ReflectDatumReader<T>(writerSchema, readerSchema);
+      this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null);
     }
 
     @Override
@@ -558,6 +484,8 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    */
   @Experimental(Experimental.Kind.SOURCE_SINK)
   public static class AvroReader<T> extends BlockBasedReader<T> {
+    private AvroMetadata metadata;
+
     // The current block.
     private AvroBlock<T> currentBlock;
 
@@ -631,10 +559,17 @@ public class AvroSource<T> extends BlockBasedSource<T> {
           "Only able to read %s/%s bytes in the block before EOF reached.",
           bytesRead,
           blockSize);
-      currentBlock = new AvroBlock<>(data, numRecords, getCurrentSource());
+      currentBlock =
+          new AvroBlock<>(
+              data,
+              numRecords,
+              getCurrentSource().type,
+              getCurrentSource().readerSchemaString,
+              metadata.getSchemaString(),
+              metadata.getCodec());
 
       // Read the end of this block, which MUST be a sync marker for correctness.
-      byte[] syncMarker = getCurrentSource().getSyncMarker();
+      byte[] syncMarker = metadata.getSyncMarker();
       byte[] readSyncMarker = new byte[syncMarker.length];
       long syncMarkerOffset = startOfNextBlock + headerSize + blockSize;
       bytesRead = IOUtils.readFully(stream, readSyncMarker);
@@ -705,7 +640,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     private PushbackInputStream createStream(ReadableByteChannel channel) {
       return new PushbackInputStream(
           Channels.newInputStream(channel),
-          getCurrentSource().getSyncMarker().length);
+          metadata.getSyncMarker().length);
     }
 
     // Postcondition: the stream is positioned at the beginning of the first block after the start
@@ -713,8 +648,15 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     // currentBlockSizeBytes will be set to 0 indicating that the previous block was empty.
     @Override
     protected void startReading(ReadableByteChannel channel) throws IOException {
+      try {
+        metadata = readMetadataFromFile(getCurrentSource().getSingleFileMetadata().resourceId());
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "Error reading metadata from file " + getCurrentSource().getSingleFileMetadata(), e);
+      }
+
       long startOffset = getCurrentSource().getStartOffset();
-      byte[] syncMarker = getCurrentSource().getSyncMarker();
+      byte[] syncMarker = metadata.getSyncMarker();
       long syncMarkerLength = syncMarker.length;
 
       if (startOffset != 0) {

http://git-wip-us.apache.org/repos/asf/beam/blob/d4026da1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index 0fc2b3e..bf2ac95 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -21,7 +21,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -439,12 +438,10 @@ public class AvroSourceTest {
     String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0,
         AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
     Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename);
-    String schemaA = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
-    String schemaB = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
-    assertNotSame(schemaA, schemaB);
-
-    AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema(schemaA);
-    AvroSource<GenericRecord> sourceB = AvroSource.from(filename).withSchema(schemaB);
+    String schema = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
+    // Add "" to the schema to make sure it is not interned.
+    AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema("" + schema);
+    AvroSource<GenericRecord> sourceB = AvroSource.from(filename).withSchema("" + schema);
     assertSame(sourceA.getReaderSchemaString(), sourceB.getReaderSchemaString());
 
     // Ensure that deserialization still goes through interning


[3/3] beam git commit: This closes #3590: [BEAM-2628] Makes AvroSource not open files while splitting

Posted by jk...@apache.org.
This closes #3590: [BEAM-2628] Makes AvroSource not open files while splitting


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1d9160fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1d9160fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1d9160fa

Branch: refs/heads/master
Commit: 1d9160fa3337704b9fbdc423796925be78b0087e
Parents: 7e63d2c d4026da
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Jul 20 16:59:27 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 20 16:59:27 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroSource.java | 265 ++++++-------------
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  37 +--
 2 files changed, 92 insertions(+), 210 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: Minor changes to AvroSource in preparation for refactoring

Posted by jk...@apache.org.
Minor changes to AvroSource in preparation for refactoring


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c52a908c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c52a908c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c52a908c

Branch: refs/heads/master
Commit: c52a908cba7765e120a94909ab02c548d1a124ad
Parents: 7e63d2c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 18 13:40:52 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 20 16:59:11 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroSource.java | 171 ++++++++-----------
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  26 +--
 2 files changed, 70 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c52a908c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 575218b..0634774 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.WeakHashMap;
 import java.util.zip.Inflater;
 import java.util.zip.InflaterInputStream;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
@@ -127,15 +128,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   // The default sync interval is 64k.
   private static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * DataFileConstants.DEFAULT_SYNC_INTERVAL;
 
-  // The JSON schema used to encode records.
-  private final String readSchemaString;
+  // The type of the records contained in the file.
+  private final Class<T> type;
+
+  // The JSON schema used to decode records.
+  @Nullable
+  private final String readerSchemaString;
 
   // The JSON schema that was used to write the source Avro file (may differ from the schema we will
   // use to read from it).
-  private final String fileSchemaString;
-
-  // The type of the records contained in the file.
-  private final Class<T> type;
+  private final String writerSchemaString;
 
   // The following metadata fields are not user-configurable. They are extracted from the object
   // container file header upon subsource creation.
@@ -147,87 +149,75 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   // The object container file's 16-byte sync marker.
   private final byte[] syncMarker;
 
-  // Default output coder, lazily initialized.
-  private transient AvroCoder<T> coder = null;
-
-  // Schema of the file, lazily initialized.
-  private transient Schema fileSchema;
-
-  // Schema used to encode records, lazily initialized.
-  private transient Schema readSchema;
-
   /**
-   * Creates an {@link AvroSource} that reads from the given file name or pattern ("glob"). The
-   * returned source can be further configured by calling {@link #withSchema} to return a type other
-   * than {@link GenericRecord}.
+   * Reads from the given file name or pattern ("glob"). The returned source can be further
+   * configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
    */
   public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
     return new AvroSource<>(
         fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class, null, null);
   }
 
-  /**
-   * Returns an {@link AvroSource} that's like this one but reads files containing records that
-   * conform to the given schema.
-   *
-   * <p>Does not modify this object.
-   */
+  /** Reads files containing records that conform to the given schema. */
   public AvroSource<GenericRecord> withSchema(String schema) {
     return new AvroSource<>(
         getFileOrPatternSpec(), getMinBundleSize(), schema, GenericRecord.class, codec, syncMarker);
   }
 
-  /**
-   * Returns an {@link AvroSource} that's like this one but reads files containing records that
-   * conform to the given schema.
-   *
-   * <p>Does not modify this object.
-   */
+  /** Like {@link #withSchema(String)}. */
   public AvroSource<GenericRecord> withSchema(Schema schema) {
     return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(), schema.toString(),
         GenericRecord.class, codec, syncMarker);
   }
 
-  /**
-   * Returns an {@link AvroSource} that's like this one but reads files containing records of the
-   * type of the given class.
-   *
-   * <p>Does not modify this object.
-   */
+  /** Reads files containing records of the given class. */
   public <X> AvroSource<X> withSchema(Class<X> clazz) {
-    return new AvroSource<X>(getFileOrPatternSpec(), getMinBundleSize(),
+    return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(),
         ReflectData.get().getSchema(clazz).toString(), clazz, codec, syncMarker);
   }
 
   /**
-   * Returns an {@link AvroSource} that's like this one but uses the supplied minimum bundle size.
-   * Refer to {@link OffsetBasedSource} for a description of {@code minBundleSize} and its use.
-   *
-   * <p>Does not modify this object.
+   * Sets the minimum bundle size. Refer to {@link OffsetBasedSource} for a description of {@code
+   * minBundleSize} and its use.
    */
   public AvroSource<T> withMinBundleSize(long minBundleSize) {
     return new AvroSource<>(
-        getFileOrPatternSpec(), minBundleSize, readSchemaString, type, codec, syncMarker);
+        getFileOrPatternSpec(), minBundleSize, readerSchemaString, type, codec, syncMarker);
   }
 
-  private AvroSource(String fileNameOrPattern, long minBundleSize, String schema, Class<T> type,
-      String codec, byte[] syncMarker) {
+  /** Constructor for FILEPATTERN mode. */
+  private AvroSource(
+      String fileNameOrPattern,
+      long minBundleSize,
+      String readerSchemaString,
+      Class<T> type,
+      String codec,
+      byte[] syncMarker) {
     super(fileNameOrPattern, minBundleSize);
-    this.readSchemaString = internSchemaString(schema);
+    this.readerSchemaString = internSchemaString(readerSchemaString);
     this.codec = codec;
     this.syncMarker = syncMarker;
     this.type = type;
-    this.fileSchemaString = null;
+    this.writerSchemaString = null;
   }
 
-  private AvroSource(Metadata metadata, long minBundleSize, long startOffset, long endOffset,
-      String schema, Class<T> type, String codec, byte[] syncMarker, String fileSchema) {
+  /** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */
+  private AvroSource(
+      Metadata metadata,
+      long minBundleSize,
+      long startOffset,
+      long endOffset,
+      String readerSchemaString,
+      Class<T> type,
+      String codec,
+      byte[] syncMarker,
+      String writerSchemaString) {
     super(metadata, minBundleSize, startOffset, endOffset);
-    this.readSchemaString = internSchemaString(schema);
+    this.readerSchemaString = internSchemaString(readerSchemaString);
     this.codec = codec;
     this.syncMarker = syncMarker;
     this.type = type;
-    this.fileSchemaString = internSchemaString(fileSchema);
+    this.writerSchemaString = internSchemaString(writerSchemaString);
   }
 
   @Override
@@ -241,13 +231,12 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
     byte[] syncMarker = this.syncMarker;
     String codec = this.codec;
-    String readSchemaString = this.readSchemaString;
-    String fileSchemaString = this.fileSchemaString;
+    String writerSchemaString = this.writerSchemaString;
     // codec and syncMarker are initially null when the source is created, as they differ
     // across input files and must be read from the file. Here, when we are creating a source
     // for a subrange of a file, we can initialize these values. When the resulting AvroSource
     // is further split, they do not need to be read again.
-    if (codec == null || syncMarker == null || fileSchemaString == null) {
+    if (codec == null || syncMarker == null || writerSchemaString == null) {
       AvroMetadata metadata;
       try {
         metadata = readMetadataFromFile(fileMetadata.resourceId());
@@ -256,20 +245,23 @@ public class AvroSource<T> extends BlockBasedSource<T> {
       }
       codec = metadata.getCodec();
       syncMarker = metadata.getSyncMarker();
-      fileSchemaString = metadata.getSchemaString();
-      // If the source was created with a null schema, use the schema that we read from the file's
-      // metadata.
-      if (readSchemaString == null) {
-        readSchemaString = metadata.getSchemaString();
-      }
+      writerSchemaString = metadata.getSchemaString();
     }
-    // Note that if the fileSchemaString is equivalent to the readSchemaString, "intern"ing
+    // Note that if the writerSchemaString is equivalent to the readerSchemaString, "intern"ing
     // the string will occur within the constructor and return the same reference as the
-    // readSchemaString. This allows for Java to have an efficient serialization since it
+    // readerSchemaString. This allows for Java to have an efficient serialization since it
     // will only encode the schema once while just storing pointers to the encoded version
     // within this source.
-    return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, readSchemaString, type,
-        codec, syncMarker, fileSchemaString);
+    return new AvroSource<>(
+        fileMetadata,
+        getMinBundleSize(),
+        start,
+        end,
+        readerSchemaString,
+        type,
+        codec,
+        syncMarker,
+        writerSchemaString);
   }
 
   @Override
@@ -279,40 +271,13 @@ public class AvroSource<T> extends BlockBasedSource<T> {
 
   @Override
   public AvroCoder<T> getDefaultOutputCoder() {
-    if (coder == null) {
-      coder = AvroCoder.of(type, internOrParseSchemaString(readSchemaString));
-    }
-    return coder;
-  }
-
-  public String getSchema() {
-    return readSchemaString;
-  }
-
-  @VisibleForTesting
-  Schema getReadSchema() {
-    if (readSchemaString == null) {
-      return null;
-    }
-
-    // If the schema has not been parsed, parse it.
-    if (readSchema == null) {
-      readSchema = internOrParseSchemaString(readSchemaString);
-    }
-    return readSchema;
+    return AvroCoder.of(type, internOrParseSchemaString(readerSchemaString));
   }
 
   @VisibleForTesting
-  Schema getFileSchema() {
-    if (fileSchemaString == null) {
-      return null;
-    }
-
-    // If the schema has not been parsed, parse it.
-    if (fileSchema == null) {
-      fileSchema = internOrParseSchemaString(fileSchemaString);
-    }
-    return fileSchema;
+  @Nullable
+  String getReaderSchemaString() {
+    return readerSchemaString;
   }
 
   private byte[] getSyncMarker() {
@@ -427,14 +392,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   }
 
   private DatumReader<T> createDatumReader() {
-    Schema readSchema = getReadSchema();
-    Schema fileSchema = getFileSchema();
-    checkNotNull(readSchema, "No read schema has been initialized for source %s", this);
-    checkNotNull(fileSchema, "No file schema has been initialized for source %s", this);
+    checkNotNull(writerSchemaString, "No writer schema has been initialized for source %s", this);
+    Schema writerSchema = internOrParseSchemaString(writerSchemaString);
+    Schema readerSchema =
+        (readerSchemaString == null) ? writerSchema : internOrParseSchemaString(readerSchemaString);
     if (type == GenericRecord.class) {
-      return new GenericDatumReader<>(fileSchema, readSchema);
+      return new GenericDatumReader<>(writerSchema, readerSchema);
     } else {
-      return new ReflectDatumReader<>(fileSchema, readSchema);
+      return new ReflectDatumReader<>(writerSchema, readerSchema);
     }
   }
 
@@ -477,16 +442,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
             getMinBundleSize(),
             getStartOffset(),
             getEndOffset(),
-            readSchemaString,
+            readerSchemaString,
             type,
             codec,
             syncMarker,
-            fileSchemaString);
+            writerSchemaString);
       case FILEPATTERN:
         return new AvroSource<>(
             getFileOrPatternSpec(),
             getMinBundleSize(),
-            readSchemaString,
+            readerSchemaString,
             type,
             codec,
             syncMarker);

http://git-wip-us.apache.org/repos/asf/beam/blob/c52a908c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index d6facba..0fc2b3e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -445,33 +445,11 @@ public class AvroSourceTest {
 
     AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema(schemaA);
     AvroSource<GenericRecord> sourceB = AvroSource.from(filename).withSchema(schemaB);
-    assertSame(sourceA.getSchema(), sourceB.getSchema());
+    assertSame(sourceA.getReaderSchemaString(), sourceB.getReaderSchemaString());
 
     // Ensure that deserialization still goes through interning
     AvroSource<GenericRecord> sourceC = SerializableUtils.clone(sourceB);
-    assertSame(sourceA.getSchema(), sourceC.getSchema());
-  }
-
-  @Test
-  public void testSchemaIsInterned() throws Exception {
-    List<Bird> birds = createRandomRecords(100);
-    String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0,
-        AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
-    Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename);
-    String schemaA = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
-    String schemaB = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
-    assertNotSame(schemaA, schemaB);
-
-    AvroSource<GenericRecord> sourceA = (AvroSource<GenericRecord>) AvroSource.from(filename)
-        .withSchema(schemaA).createForSubrangeOfFile(fileMetadata, 0L, 0L);
-    AvroSource<GenericRecord> sourceB = (AvroSource<GenericRecord>) AvroSource.from(filename)
-        .withSchema(schemaB).createForSubrangeOfFile(fileMetadata, 0L, 0L);
-    assertSame(sourceA.getReadSchema(), sourceA.getFileSchema());
-    assertSame(sourceA.getReadSchema(), sourceB.getReadSchema());
-    assertSame(sourceA.getReadSchema(), sourceB.getFileSchema());
-
-    // Schemas are transient and not serialized thus we don't need to worry about interning
-    // after deserialization.
+    assertSame(sourceA.getReaderSchemaString(), sourceC.getReaderSchemaString());
   }
 
   private void assertEqualsWithGeneric(List<Bird> expected, List<GenericRecord> actual) {