You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/21 00:06:03 UTC

[2/3] beam git commit: Minor changes to AvroSource in preparation for refactoring

Minor changes to AvroSource in preparation for refactoring


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c52a908c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c52a908c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c52a908c

Branch: refs/heads/master
Commit: c52a908cba7765e120a94909ab02c548d1a124ad
Parents: 7e63d2c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 18 13:40:52 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 20 16:59:11 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroSource.java | 171 ++++++++-----------
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  26 +--
 2 files changed, 70 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c52a908c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 575218b..0634774 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.WeakHashMap;
 import java.util.zip.Inflater;
 import java.util.zip.InflaterInputStream;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
@@ -127,15 +128,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   // The default sync interval is 64k.
   private static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * DataFileConstants.DEFAULT_SYNC_INTERVAL;
 
-  // The JSON schema used to encode records.
-  private final String readSchemaString;
+  // The type of the records contained in the file.
+  private final Class<T> type;
+
+  // The JSON schema used to decode records.
+  @Nullable
+  private final String readerSchemaString;
 
   // The JSON schema that was used to write the source Avro file (may differ from the schema we will
   // use to read from it).
-  private final String fileSchemaString;
-
-  // The type of the records contained in the file.
-  private final Class<T> type;
+  private final String writerSchemaString;
 
   // The following metadata fields are not user-configurable. They are extracted from the object
   // container file header upon subsource creation.
@@ -147,87 +149,75 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   // The object container file's 16-byte sync marker.
   private final byte[] syncMarker;
 
-  // Default output coder, lazily initialized.
-  private transient AvroCoder<T> coder = null;
-
-  // Schema of the file, lazily initialized.
-  private transient Schema fileSchema;
-
-  // Schema used to encode records, lazily initialized.
-  private transient Schema readSchema;
-
   /**
-   * Creates an {@link AvroSource} that reads from the given file name or pattern ("glob"). The
-   * returned source can be further configured by calling {@link #withSchema} to return a type other
-   * than {@link GenericRecord}.
+   * Reads from the given file name or pattern ("glob"). The returned source can be further
+   * configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
    */
   public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
     return new AvroSource<>(
         fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class, null, null);
   }
 
-  /**
-   * Returns an {@link AvroSource} that's like this one but reads files containing records that
-   * conform to the given schema.
-   *
-   * <p>Does not modify this object.
-   */
+  /** Reads files containing records that conform to the given schema. */
   public AvroSource<GenericRecord> withSchema(String schema) {
     return new AvroSource<>(
         getFileOrPatternSpec(), getMinBundleSize(), schema, GenericRecord.class, codec, syncMarker);
   }
 
-  /**
-   * Returns an {@link AvroSource} that's like this one but reads files containing records that
-   * conform to the given schema.
-   *
-   * <p>Does not modify this object.
-   */
+  /** Like {@link #withSchema(String)}. */
   public AvroSource<GenericRecord> withSchema(Schema schema) {
     return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(), schema.toString(),
         GenericRecord.class, codec, syncMarker);
   }
 
-  /**
-   * Returns an {@link AvroSource} that's like this one but reads files containing records of the
-   * type of the given class.
-   *
-   * <p>Does not modify this object.
-   */
+  /** Reads files containing records of the given class. */
   public <X> AvroSource<X> withSchema(Class<X> clazz) {
-    return new AvroSource<X>(getFileOrPatternSpec(), getMinBundleSize(),
+    return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(),
         ReflectData.get().getSchema(clazz).toString(), clazz, codec, syncMarker);
   }
 
   /**
-   * Returns an {@link AvroSource} that's like this one but uses the supplied minimum bundle size.
-   * Refer to {@link OffsetBasedSource} for a description of {@code minBundleSize} and its use.
-   *
-   * <p>Does not modify this object.
+   * Sets the minimum bundle size. Refer to {@link OffsetBasedSource} for a description of {@code
+   * minBundleSize} and its use.
    */
   public AvroSource<T> withMinBundleSize(long minBundleSize) {
     return new AvroSource<>(
-        getFileOrPatternSpec(), minBundleSize, readSchemaString, type, codec, syncMarker);
+        getFileOrPatternSpec(), minBundleSize, readerSchemaString, type, codec, syncMarker);
   }
 
-  private AvroSource(String fileNameOrPattern, long minBundleSize, String schema, Class<T> type,
-      String codec, byte[] syncMarker) {
+  /** Constructor for FILEPATTERN mode. */
+  private AvroSource(
+      String fileNameOrPattern,
+      long minBundleSize,
+      String readerSchemaString,
+      Class<T> type,
+      String codec,
+      byte[] syncMarker) {
     super(fileNameOrPattern, minBundleSize);
-    this.readSchemaString = internSchemaString(schema);
+    this.readerSchemaString = internSchemaString(readerSchemaString);
     this.codec = codec;
     this.syncMarker = syncMarker;
     this.type = type;
-    this.fileSchemaString = null;
+    this.writerSchemaString = null;
   }
 
-  private AvroSource(Metadata metadata, long minBundleSize, long startOffset, long endOffset,
-      String schema, Class<T> type, String codec, byte[] syncMarker, String fileSchema) {
+  /** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */
+  private AvroSource(
+      Metadata metadata,
+      long minBundleSize,
+      long startOffset,
+      long endOffset,
+      String readerSchemaString,
+      Class<T> type,
+      String codec,
+      byte[] syncMarker,
+      String writerSchemaString) {
     super(metadata, minBundleSize, startOffset, endOffset);
-    this.readSchemaString = internSchemaString(schema);
+    this.readerSchemaString = internSchemaString(readerSchemaString);
     this.codec = codec;
     this.syncMarker = syncMarker;
     this.type = type;
-    this.fileSchemaString = internSchemaString(fileSchema);
+    this.writerSchemaString = internSchemaString(writerSchemaString);
   }
 
   @Override
@@ -241,13 +231,12 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
     byte[] syncMarker = this.syncMarker;
     String codec = this.codec;
-    String readSchemaString = this.readSchemaString;
-    String fileSchemaString = this.fileSchemaString;
+    String writerSchemaString = this.writerSchemaString;
     // codec and syncMarker are initially null when the source is created, as they differ
     // across input files and must be read from the file. Here, when we are creating a source
     // for a subrange of a file, we can initialize these values. When the resulting AvroSource
     // is further split, they do not need to be read again.
-    if (codec == null || syncMarker == null || fileSchemaString == null) {
+    if (codec == null || syncMarker == null || writerSchemaString == null) {
       AvroMetadata metadata;
       try {
         metadata = readMetadataFromFile(fileMetadata.resourceId());
@@ -256,20 +245,23 @@ public class AvroSource<T> extends BlockBasedSource<T> {
       }
       codec = metadata.getCodec();
       syncMarker = metadata.getSyncMarker();
-      fileSchemaString = metadata.getSchemaString();
-      // If the source was created with a null schema, use the schema that we read from the file's
-      // metadata.
-      if (readSchemaString == null) {
-        readSchemaString = metadata.getSchemaString();
-      }
+      writerSchemaString = metadata.getSchemaString();
     }
-    // Note that if the fileSchemaString is equivalent to the readSchemaString, "intern"ing
+    // Note that if the writerSchemaString is equivalent to the readerSchemaString, "intern"ing
     // the string will occur within the constructor and return the same reference as the
-    // readSchemaString. This allows for Java to have an efficient serialization since it
+    // readerSchemaString. This allows for Java to have an efficient serialization since it
     // will only encode the schema once while just storing pointers to the encoded version
     // within this source.
-    return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, readSchemaString, type,
-        codec, syncMarker, fileSchemaString);
+    return new AvroSource<>(
+        fileMetadata,
+        getMinBundleSize(),
+        start,
+        end,
+        readerSchemaString,
+        type,
+        codec,
+        syncMarker,
+        writerSchemaString);
   }
 
   @Override
@@ -279,40 +271,13 @@ public class AvroSource<T> extends BlockBasedSource<T> {
 
   @Override
   public AvroCoder<T> getDefaultOutputCoder() {
-    if (coder == null) {
-      coder = AvroCoder.of(type, internOrParseSchemaString(readSchemaString));
-    }
-    return coder;
-  }
-
-  public String getSchema() {
-    return readSchemaString;
-  }
-
-  @VisibleForTesting
-  Schema getReadSchema() {
-    if (readSchemaString == null) {
-      return null;
-    }
-
-    // If the schema has not been parsed, parse it.
-    if (readSchema == null) {
-      readSchema = internOrParseSchemaString(readSchemaString);
-    }
-    return readSchema;
+    return AvroCoder.of(type, internOrParseSchemaString(readerSchemaString));
   }
 
   @VisibleForTesting
-  Schema getFileSchema() {
-    if (fileSchemaString == null) {
-      return null;
-    }
-
-    // If the schema has not been parsed, parse it.
-    if (fileSchema == null) {
-      fileSchema = internOrParseSchemaString(fileSchemaString);
-    }
-    return fileSchema;
+  @Nullable
+  String getReaderSchemaString() {
+    return readerSchemaString;
   }
 
   private byte[] getSyncMarker() {
@@ -427,14 +392,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   }
 
   private DatumReader<T> createDatumReader() {
-    Schema readSchema = getReadSchema();
-    Schema fileSchema = getFileSchema();
-    checkNotNull(readSchema, "No read schema has been initialized for source %s", this);
-    checkNotNull(fileSchema, "No file schema has been initialized for source %s", this);
+    checkNotNull(writerSchemaString, "No writer schema has been initialized for source %s", this);
+    Schema writerSchema = internOrParseSchemaString(writerSchemaString);
+    Schema readerSchema =
+        (readerSchemaString == null) ? writerSchema : internOrParseSchemaString(readerSchemaString);
     if (type == GenericRecord.class) {
-      return new GenericDatumReader<>(fileSchema, readSchema);
+      return new GenericDatumReader<>(writerSchema, readerSchema);
     } else {
-      return new ReflectDatumReader<>(fileSchema, readSchema);
+      return new ReflectDatumReader<>(writerSchema, readerSchema);
     }
   }
 
@@ -477,16 +442,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
             getMinBundleSize(),
             getStartOffset(),
             getEndOffset(),
-            readSchemaString,
+            readerSchemaString,
             type,
             codec,
             syncMarker,
-            fileSchemaString);
+            writerSchemaString);
       case FILEPATTERN:
         return new AvroSource<>(
             getFileOrPatternSpec(),
             getMinBundleSize(),
-            readSchemaString,
+            readerSchemaString,
             type,
             codec,
             syncMarker);

http://git-wip-us.apache.org/repos/asf/beam/blob/c52a908c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index d6facba..0fc2b3e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -445,33 +445,11 @@ public class AvroSourceTest {
 
     AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema(schemaA);
     AvroSource<GenericRecord> sourceB = AvroSource.from(filename).withSchema(schemaB);
-    assertSame(sourceA.getSchema(), sourceB.getSchema());
+    assertSame(sourceA.getReaderSchemaString(), sourceB.getReaderSchemaString());
 
     // Ensure that deserialization still goes through interning
     AvroSource<GenericRecord> sourceC = SerializableUtils.clone(sourceB);
-    assertSame(sourceA.getSchema(), sourceC.getSchema());
-  }
-
-  @Test
-  public void testSchemaIsInterned() throws Exception {
-    List<Bird> birds = createRandomRecords(100);
-    String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0,
-        AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
-    Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename);
-    String schemaA = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
-    String schemaB = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
-    assertNotSame(schemaA, schemaB);
-
-    AvroSource<GenericRecord> sourceA = (AvroSource<GenericRecord>) AvroSource.from(filename)
-        .withSchema(schemaA).createForSubrangeOfFile(fileMetadata, 0L, 0L);
-    AvroSource<GenericRecord> sourceB = (AvroSource<GenericRecord>) AvroSource.from(filename)
-        .withSchema(schemaB).createForSubrangeOfFile(fileMetadata, 0L, 0L);
-    assertSame(sourceA.getReadSchema(), sourceA.getFileSchema());
-    assertSame(sourceA.getReadSchema(), sourceB.getReadSchema());
-    assertSame(sourceA.getReadSchema(), sourceB.getFileSchema());
-
-    // Schemas are transient and not serialized thus we don't need to worry about interning
-    // after deserialization.
+    assertSame(sourceA.getReaderSchemaString(), sourceC.getReaderSchemaString());
   }
 
   private void assertEqualsWithGeneric(List<Bird> expected, List<GenericRecord> actual) {