You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/26 00:51:52 UTC

[3/4] beam git commit: Adds ValueProvider support to AvroIO.Read

Adds ValueProvider support to AvroIO.Read


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e80c83b2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e80c83b2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e80c83b2

Branch: refs/heads/master
Commit: e80c83b2a0a1cf55aa8a452a02a76c9dc13697cc
Parents: 71196ec
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jul 21 12:38:17 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jul 25 17:36:49 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 49 ++++++++------------
 .../java/org/apache/beam/sdk/io/AvroSource.java | 24 +++++++---
 .../apache/beam/sdk/io/BlockBasedSource.java    |  6 +++
 3 files changed, 42 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e80c83b2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 89cadbd..d308c85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableMap;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -185,10 +185,10 @@ public class AvroIO {
         .setWindowedWrites(false);
   }
 
-  /** Implementation of {@link #read}. */
+  /** Implementation of {@link #read} and {@link #readGenericRecords}. */
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
-    @Nullable abstract String getFilepattern();
+    @Nullable abstract ValueProvider<String> getFilepattern();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
 
@@ -196,7 +196,7 @@ public class AvroIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
-      abstract Builder<T> setFilepattern(String filepattern);
+      abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
 
@@ -204,45 +204,34 @@ public class AvroIO {
     }
 
     /** Reads from the given filename or filepattern. */
-    public Read<T> from(String filepattern) {
+    public Read<T> from(ValueProvider<String> filepattern) {
       return toBuilder().setFilepattern(filepattern).build();
     }
 
+    /** Like {@link #from(ValueProvider)}. */
+    public Read<T> from(String filepattern) {
+      return from(StaticValueProvider.of(filepattern));
+    }
+
     @Override
     public PCollection<T> expand(PBegin input) {
-      if (getFilepattern() == null) {
-        throw new IllegalStateException(
-            "need to set the filepattern of an AvroIO.Read transform");
-      }
-      if (getSchema() == null) {
-        throw new IllegalStateException("need to set the schema of an AvroIO.Read transform");
-      }
+      checkNotNull(getFilepattern(), "filepattern");
+      checkNotNull(getSchema(), "schema");
 
       @SuppressWarnings("unchecked")
-      Bounded<T> read =
+      AvroSource<T> source =
           getRecordClass() == GenericRecord.class
-              ? (Bounded<T>) org.apache.beam.sdk.io.Read.from(
-                  AvroSource.from(getFilepattern()).withSchema(getSchema()))
-              : org.apache.beam.sdk.io.Read.from(
-                  AvroSource.from(getFilepattern()).withSchema(getRecordClass()));
-
-      PCollection<T> pcol = input.getPipeline().apply("Read", read);
-      // Honor the default output coder that would have been used by this PTransform.
-      pcol.setCoder(getDefaultOutputCoder());
-      return pcol;
+              ? (AvroSource<T>) AvroSource.from(getFilepattern()).withSchema(getSchema())
+              : AvroSource.from(getFilepattern()).withSchema(getRecordClass());
+
+      return input.getPipeline().apply("Read", org.apache.beam.sdk.io.Read.from(source));
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      builder
-        .addIfNotNull(DisplayData.item("filePattern", getFilepattern())
-          .withLabel("Input File Pattern"));
-    }
-
-    @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return AvroCoder.of(getRecordClass(), getSchema());
+      builder.addIfNotNull(
+          DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e80c83b2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 7cd97a8..a98d870 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
@@ -140,26 +141,31 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    * Reads from the given file name or pattern ("glob"). The returned source can be further
    * configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
    */
-  public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
+  public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {
     return new AvroSource<>(fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class);
   }
 
+  /** Like {@link #from(ValueProvider)}. */
+  public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
+    return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
+  }
+
   /** Reads files containing records that conform to the given schema. */
   public AvroSource<GenericRecord> withSchema(String schema) {
     return new AvroSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), schema, GenericRecord.class);
+        getFileOrPatternSpecProvider(), getMinBundleSize(), schema, GenericRecord.class);
   }
 
   /** Like {@link #withSchema(String)}. */
   public AvroSource<GenericRecord> withSchema(Schema schema) {
     return new AvroSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), schema.toString(), GenericRecord.class);
+        getFileOrPatternSpecProvider(), getMinBundleSize(), schema.toString(), GenericRecord.class);
   }
 
   /** Reads files containing records of the given class. */
   public <X> AvroSource<X> withSchema(Class<X> clazz) {
     return new AvroSource<>(
-        getFileOrPatternSpec(),
+        getFileOrPatternSpecProvider(),
         getMinBundleSize(),
         ReflectData.get().getSchema(clazz).toString(),
         clazz);
@@ -170,12 +176,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    * minBundleSize} and its use.
    */
   public AvroSource<T> withMinBundleSize(long minBundleSize) {
-    return new AvroSource<>(getFileOrPatternSpec(), minBundleSize, readerSchemaString, type);
+    return new AvroSource<>(
+        getFileOrPatternSpecProvider(), minBundleSize, readerSchemaString, type);
   }
 
   /** Constructor for FILEPATTERN mode. */
   private AvroSource(
-      String fileNameOrPattern, long minBundleSize, String readerSchemaString, Class<T> type) {
+      ValueProvider<String> fileNameOrPattern,
+      long minBundleSize,
+      String readerSchemaString,
+      Class<T> type) {
     super(fileNameOrPattern, minBundleSize);
     this.readerSchemaString = internSchemaString(readerSchemaString);
     this.type = type;
@@ -378,7 +388,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
             type);
       case FILEPATTERN:
         return new AvroSource<>(
-            getFileOrPatternSpec(), getMinBundleSize(), readerSchemaString, type);
+            getFileOrPatternSpecProvider(), getMinBundleSize(), readerSchemaString, type);
         default:
           throw new InvalidObjectException(
               String.format("Unknown mode %s for AvroSource %s", getMode(), this));

http://git-wip-us.apache.org/repos/asf/beam/blob/e80c83b2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
index cf6671e..25e8483 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
@@ -23,6 +23,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 
 /**
@@ -69,6 +70,11 @@ public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
     super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
   }
 
+  /** Like {@link #BlockBasedSource(String, long)}. */
+  public BlockBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
+    super(fileOrPatternSpec, minBundleSize);
+  }
+
   /**
    * Creates a {@code BlockBasedSource} for a single file. Subclasses must call this constructor
    * when implementing {@link BlockBasedSource#createForSubrangeOfFile}. See documentation in