You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/30 19:11:38 UTC

[3/6] beam git commit: Adds EmptyMatchTreatment to AvroIO

Adds EmptyMatchTreatment to AvroIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82b08523
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82b08523
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82b08523

Branch: refs/heads/master
Commit: 82b08523084aa6f20ea3c4d5b8b89cdbe0378060
Parents: 84eb7f3
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 14:40:52 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:18 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 161 ++++++++++++++-----
 .../java/org/apache/beam/sdk/io/AvroSource.java |  22 ++-
 .../apache/beam/sdk/io/BlockBasedSource.java    |  27 +++-
 .../org/apache/beam/sdk/io/FileBasedSource.java |   4 +
 4 files changed, 171 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d4a7cbb..9601a7d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -69,6 +70,12 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  *
  * <p>See {@link FileSystems} for information on supported file systems and filepatterns.
  *
+ * <h3>Filepattern expansion and watching</h3>
+ *
+ * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link #readAll}
+ * allows them in case the filepattern contains a glob wildcard character. Use {@link
+ * Read#withEmptyMatchTreatment} to configure this behavior.
+ *
  * <h3>Reading records of a known schema</h3>
  *
  * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
@@ -239,6 +246,7 @@ public class AvroIO {
    */
   public static <T> Read<T> read(Class<T> recordClass) {
     return new AutoValue_AvroIO_Read.Builder<T>()
+        .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
         .setRecordClass(recordClass)
         .setSchema(ReflectData.get().getSchema(recordClass))
         .setHintMatchesManyFiles(false)
@@ -248,6 +256,7 @@ public class AvroIO {
   /** Like {@link #read}, but reads each filepattern in the input {@link PCollection}. */
   public static <T> ReadAll<T> readAll(Class<T> recordClass) {
     return new AutoValue_AvroIO_ReadAll.Builder<T>()
+        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
         .setRecordClass(recordClass)
         .setSchema(ReflectData.get().getSchema(recordClass))
         // 64MB is a reasonable value that allows to amortize the cost of opening files,
@@ -260,6 +269,7 @@ public class AvroIO {
   /** Reads Avro file(s) containing records of the specified schema. */
   public static Read<GenericRecord> readGenericRecords(Schema schema) {
     return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
+        .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
         .setRecordClass(GenericRecord.class)
         .setSchema(schema)
         .setHintMatchesManyFiles(false)
@@ -272,6 +282,7 @@ public class AvroIO {
    */
   public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
     return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>()
+        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
         .setRecordClass(GenericRecord.class)
         .setSchema(schema)
         .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
@@ -300,6 +311,7 @@ public class AvroIO {
    */
   public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
     return new AutoValue_AvroIO_Parse.Builder<T>()
+        .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
         .setParseFn(parseFn)
         .setHintMatchesManyFiles(false)
         .build();
@@ -312,6 +324,7 @@ public class AvroIO {
   public static <T> ParseAll<T> parseAllGenericRecords(
       SerializableFunction<GenericRecord, T> parseFn) {
     return new AutoValue_AvroIO_ParseAll.Builder<T>()
+        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
         .setParseFn(parseFn)
         .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
         .build();
@@ -392,6 +405,7 @@ public class AvroIO {
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
     @Nullable abstract ValueProvider<String> getFilepattern();
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract boolean getHintMatchesManyFiles();
@@ -401,6 +415,7 @@ public class AvroIO {
     @AutoValue.Builder
     abstract static class Builder<T> {
       abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+      abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
       abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
@@ -424,6 +439,13 @@ public class AvroIO {
     }
 
     /**
+     * Configures whether or not a filepattern matching no files is allowed.
+     */
+    public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    /**
      * Hints that the filepattern specified in {@link #from(String)} matches a very large number of
      * files.
      *
@@ -440,37 +462,48 @@ public class AvroIO {
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getFilepattern(), "filepattern");
       checkNotNull(getSchema(), "schema");
-      if (getHintMatchesManyFiles()) {
-        ReadAll<T> readAll =
-            (getRecordClass() == GenericRecord.class)
-                ? (ReadAll<T>) readAllGenericRecords(getSchema())
-                : readAll(getRecordClass());
-        return input
-            .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
-            .apply(readAll);
-      } else {
-        return input
-            .getPipeline()
-            .apply(
-                "Read",
-                org.apache.beam.sdk.io.Read.from(
-                    createSource(getFilepattern(), getRecordClass(), getSchema())));
+
+      if (!getHintMatchesManyFiles()) {
+        return input.apply(
+            "Read",
+            org.apache.beam.sdk.io.Read.from(
+                createSource(
+                    getFilepattern(), getEmptyMatchTreatment(), getRecordClass(), getSchema())));
       }
+      // All other cases go through ReadAll.
+
+      ReadAll<T> readAll =
+          (getRecordClass() == GenericRecord.class)
+              ? (ReadAll<T>) readAllGenericRecords(getSchema())
+              : readAll(getRecordClass());
+      readAll = readAll.withEmptyMatchTreatment(getEmptyMatchTreatment());
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+          .apply("Via ReadAll", readAll);
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      builder.addIfNotNull(
-          DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"));
+      builder
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"));
     }
 
     @SuppressWarnings("unchecked")
     private static <T> AvroSource<T> createSource(
-        ValueProvider<String> filepattern, Class<T> recordClass, Schema schema) {
+        ValueProvider<String> filepattern,
+        EmptyMatchTreatment emptyMatchTreatment,
+        Class<T> recordClass,
+        Schema schema) {
+      AvroSource<?> source =
+          AvroSource.from(filepattern).withEmptyMatchTreatment(emptyMatchTreatment);
       return recordClass == GenericRecord.class
-          ? (AvroSource<T>) AvroSource.from(filepattern).withSchema(schema)
-          : AvroSource.from(filepattern).withSchema(recordClass);
+          ? (AvroSource<T>) source.withSchema(schema)
+          : source.withSchema(recordClass);
     }
   }
 
@@ -479,6 +512,7 @@ public class AvroIO {
   /** Implementation of {@link #readAll}. */
   @AutoValue
   public abstract static class ReadAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract long getDesiredBundleSizeBytes();
@@ -487,6 +521,7 @@ public class AvroIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
+      abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
       abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
@@ -494,6 +529,11 @@ public class AvroIO {
       abstract ReadAll<T> build();
     }
 
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public ReadAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
     @VisibleForTesting
     ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
       return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
@@ -502,24 +542,40 @@ public class AvroIO {
     @Override
     public PCollection<T> expand(PCollection<String> input) {
       checkNotNull(getSchema(), "schema");
+      Match.Filepatterns matchFilepatterns =
+          Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment());
+
       return input
-          .apply(Match.filepatterns())
+          .apply(matchFilepatterns)
           .apply(
               "Read all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(
                   SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */,
                   getDesiredBundleSizeBytes(),
-                  new CreateSourceFn<>(getRecordClass(), getSchema().toString())))
+                  new CreateSourceFn<>(
+                      getEmptyMatchTreatment(), getRecordClass(), getSchema().toString())))
           .setCoder(AvroCoder.of(getRecordClass(), getSchema()));
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"));
+    }
   }
 
   private static class CreateSourceFn<T>
       implements SerializableFunction<String, FileBasedSource<T>> {
+    private final EmptyMatchTreatment emptyMatchTreatment;
     private final Class<T> recordClass;
     private final Supplier<Schema> schemaSupplier;
 
-    public CreateSourceFn(Class<T> recordClass, String jsonSchema) {
+    public CreateSourceFn(
+        EmptyMatchTreatment emptyMatchTreatment, Class<T> recordClass, String jsonSchema) {
+      this.emptyMatchTreatment = emptyMatchTreatment;
       this.recordClass = recordClass;
       this.schemaSupplier = AvroUtils.serializableSchemaSupplier(jsonSchema);
     }
@@ -527,7 +583,7 @@ public class AvroIO {
     @Override
     public FileBasedSource<T> apply(String input) {
       return Read.createSource(
-          StaticValueProvider.of(input), recordClass, schemaSupplier.get());
+          StaticValueProvider.of(input), emptyMatchTreatment, recordClass, schemaSupplier.get());
     }
   }
 
@@ -537,6 +593,7 @@ public class AvroIO {
   @AutoValue
   public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
     @Nullable abstract ValueProvider<String> getFilepattern();
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
     abstract SerializableFunction<GenericRecord, T> getParseFn();
     @Nullable abstract Coder<T> getCoder();
     abstract boolean getHintMatchesManyFiles();
@@ -546,6 +603,7 @@ public class AvroIO {
     @AutoValue.Builder
     abstract static class Builder<T> {
       abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+      abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
       abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
       abstract Builder<T> setCoder(Coder<T> coder);
       abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles);
@@ -563,6 +621,11 @@ public class AvroIO {
       return toBuilder().setFilepattern(filepattern).build();
     }
 
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public Parse<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
     /** Sets a coder for the result of the parse function. */
     public Parse<T> withCoder(Coder<T> coder) {
       return toBuilder().setCoder(coder).build();
@@ -577,14 +640,20 @@ public class AvroIO {
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getFilepattern(), "filepattern");
       Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
-      if (getHintMatchesManyFiles()) {
-        return input
-            .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
-            .apply(parseAllGenericRecords(getParseFn()).withCoder(getCoder()));
+
+      if (!getHintMatchesManyFiles()) {
+        return input.apply(
+                org.apache.beam.sdk.io.Read.from(
+                        AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
       }
-      return input.apply(
-          org.apache.beam.sdk.io.Read.from(
-              AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
+      // All other cases go through ParseAllGenericRecords.
+      ParseAll<T> parseAll =
+          parseAllGenericRecords(getParseFn())
+              .withCoder(coder)
+              .withEmptyMatchTreatment(getEmptyMatchTreatment());
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+          .apply("Via ParseAll", parseAll);
     }
 
     private static <T> Coder<T> inferCoder(
@@ -612,7 +681,10 @@ public class AvroIO {
       builder
           .addIfNotNull(
               DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
-          .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+          .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"));
     }
   }
 
@@ -621,6 +693,7 @@ public class AvroIO {
   /** Implementation of {@link #parseAllGenericRecords}. */
   @AutoValue
   public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
     abstract SerializableFunction<GenericRecord, T> getParseFn();
     @Nullable abstract Coder<T> getCoder();
     abstract long getDesiredBundleSizeBytes();
@@ -629,6 +702,7 @@ public class AvroIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
+      abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
       abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
       abstract Builder<T> setCoder(Coder<T> coder);
       abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
@@ -636,6 +710,11 @@ public class AvroIO {
       abstract ParseAll<T> build();
     }
 
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public ParseAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
     /** Specifies the coder for the result of the {@code parseFn}. */
     public ParseAll<T> withCoder(Coder<T> coder) {
       return toBuilder().setCoder(coder).build();
@@ -650,15 +729,21 @@ public class AvroIO {
     public PCollection<T> expand(PCollection<String> input) {
       final Coder<T> coder =
           Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
-      SerializableFunction<String, FileBasedSource<T>> createSource =
+      final SerializableFunction<GenericRecord, T> parseFn = getParseFn();
+      final EmptyMatchTreatment emptyMatchTreatment = getEmptyMatchTreatment();
+      final SerializableFunction<String, FileBasedSource<T>> createSource =
           new SerializableFunction<String, FileBasedSource<T>>() {
             @Override
             public FileBasedSource<T> apply(String input) {
-              return AvroSource.from(input).withParseFn(getParseFn(), coder);
+              return AvroSource.from(input)
+                  .withParseFn(parseFn, coder)
+                  .withEmptyMatchTreatment(emptyMatchTreatment);
             }
           };
+      Match.Filepatterns matchFilepatterns =
+              Match.filepatterns().withEmptyMatchTreatment(emptyMatchTreatment);
       return input
-          .apply(Match.filepatterns())
+          .apply(matchFilepatterns)
           .apply(
               "Parse all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(
@@ -671,7 +756,11 @@ public class AvroIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      builder.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+      builder
+          .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 8dd3125..2600d76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -211,6 +212,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {
     return new AvroSource<>(
         fileNameOrPattern,
+        EmptyMatchTreatment.DISALLOW,
         DEFAULT_MIN_BUNDLE_SIZE,
         readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
   }
@@ -220,11 +222,20 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
   }
 
+  public AvroSource<T> withEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment) {
+    return new AvroSource<T>(
+            getFileOrPatternSpecProvider(),
+            emptyMatchTreatment,
+            getMinBundleSize(),
+            mode);
+  }
+
   /** Reads files containing records that conform to the given schema. */
   public AvroSource<GenericRecord> withSchema(String schema) {
     checkNotNull(schema, "schema");
     return new AvroSource<>(
         getFileOrPatternSpecProvider(),
+        getEmptyMatchTreatment(),
         getMinBundleSize(),
         readGenericRecordsWithSchema(schema));
   }
@@ -240,6 +251,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     checkNotNull(clazz, "clazz");
     return new AvroSource<>(
         getFileOrPatternSpecProvider(),
+        getEmptyMatchTreatment(),
         getMinBundleSize(),
         readGeneratedClasses(clazz));
   }
@@ -254,6 +266,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     checkNotNull(parseFn, "coder");
     return new AvroSource<>(
         getFileOrPatternSpecProvider(),
+        getEmptyMatchTreatment(),
         getMinBundleSize(),
         parseGenericRecords(parseFn, coder));
   }
@@ -263,15 +276,17 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    * minBundleSize} and its use.
    */
   public AvroSource<T> withMinBundleSize(long minBundleSize) {
-    return new AvroSource<>(getFileOrPatternSpecProvider(), minBundleSize, mode);
+    return new AvroSource<>(
+        getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), minBundleSize, mode);
   }
 
   /** Constructor for FILEPATTERN mode. */
   private AvroSource(
       ValueProvider<String> fileNameOrPattern,
+      EmptyMatchTreatment emptyMatchTreatment,
       long minBundleSize,
       Mode<T> mode) {
-    super(fileNameOrPattern, minBundleSize);
+    super(fileNameOrPattern, emptyMatchTreatment, minBundleSize);
     this.mode = mode;
   }
 
@@ -463,7 +478,8 @@ public class AvroSource<T> extends BlockBasedSource<T> {
         return new AvroSource<>(
             getSingleFileMetadata(), getMinBundleSize(), getStartOffset(), getEndOffset(), mode);
       case FILEPATTERN:
-        return new AvroSource<>(getFileOrPatternSpecProvider(), getMinBundleSize(), mode);
+        return new AvroSource<>(
+            getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), getMinBundleSize(), mode);
         default:
           throw new InvalidObjectException(
               String.format("Unknown mode %s for AvroSource %s", getMode(), this));

http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
index 25e8483..ec4f4ad 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -63,18 +64,36 @@ import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
   /**
    * Creates a {@code BlockBasedSource} based on a file name or pattern. Subclasses must call this
-   * constructor when creating a {@code BlockBasedSource} for a file pattern. See
-   * {@link FileBasedSource} for more information.
+   * constructor when creating a {@code BlockBasedSource} for a file pattern. See {@link
+   * FileBasedSource} for more information.
+   */
+  public BlockBasedSource(
+      String fileOrPatternSpec, EmptyMatchTreatment emptyMatchTreatment, long minBundleSize) {
+    this(StaticValueProvider.of(fileOrPatternSpec), emptyMatchTreatment, minBundleSize);
+  }
+
+  /**
+   * Like {@link #BlockBasedSource(String, EmptyMatchTreatment, long)} but with a default {@link
+   * EmptyMatchTreatment} of {@link EmptyMatchTreatment#DISALLOW}.
    */
   public BlockBasedSource(String fileOrPatternSpec, long minBundleSize) {
-    super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
+    this(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
   }
 
   /** Like {@link #BlockBasedSource(String, long)}. */
   public BlockBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
-    super(fileOrPatternSpec, minBundleSize);
+    this(fileOrPatternSpec, EmptyMatchTreatment.DISALLOW, minBundleSize);
   }
 
+  /** Like {@link #BlockBasedSource(String, EmptyMatchTreatment, long)}. */
+  public BlockBasedSource(
+          ValueProvider<String> fileOrPatternSpec,
+          EmptyMatchTreatment emptyMatchTreatment,
+          long minBundleSize) {
+    super(fileOrPatternSpec, emptyMatchTreatment, minBundleSize);
+  }
+
+
   /**
    * Creates a {@code BlockBasedSource} for a single file. Subclasses must call this constructor
    * when implementing {@link BlockBasedSource#createForSubrangeOfFile}. See documentation in

http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index f835fa4..dabda84 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -154,6 +154,10 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     return fileOrPatternSpec;
   }
 
+  public final EmptyMatchTreatment getEmptyMatchTreatment() {
+    return emptyMatchTreatment;
+  }
+
   public final Mode getMode() {
     return mode;
   }